This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 99d268fd4e6 Subscription: improve nack for subscribing tsfile & delete 
tsfile that failed to poll (#12843)
99d268fd4e6 is described below

commit 99d268fd4e6b9aa610c0950c37bbc2e7bc0de4c2
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Jul 5 17:49:54 2024 +0800

    Subscription: improve nack for subscribing tsfile & delete tsfile that 
failed to poll (#12843)
---
 .../it/dual/IoTDBSubscriptionConsumerGroupIT.java  |  80 ++---
 .../it/dual/IoTDBSubscriptionTopicIT.java          | 177 +++++++---
 .../it/local/IoTDBSubscriptionBasicIT.java         | 302 +----------------
 .../it/local/IoTDBSubscriptionDataTypeIT.java      | 368 +++++++++++++++++++++
 .../consumer/SubscriptionConsumer.java             |  16 +-
 .../agent/SubscriptionBrokerAgent.java             |   3 +-
 .../db/subscription/broker/SubscriptionBroker.java |   8 +-
 .../broker/SubscriptionPrefetchingQueue.java       |  86 +++--
 .../broker/SubscriptionPrefetchingTabletQueue.java |   6 +-
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  41 ++-
 .../db/subscription/event/SubscriptionEvent.java   |   7 +-
 11 files changed, 659 insertions(+), 435 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 9a0d7865afe..eccfc634e6b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -976,54 +976,54 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                         
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
                     for (final SubscriptionMessage message : messages) {
                       final short messageType = message.getMessageType();
-                      if 
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
-                        switch (SubscriptionMessageType.valueOf(messageType)) {
-                          case SESSION_DATA_SETS_HANDLER:
-                            {
-                              for (final SubscriptionSessionDataSet dataSet :
-                                  message.getSessionDataSetsHandler()) {
-                                final List<String> columnNameList = 
dataSet.getColumnNames();
-                                while (dataSet.hasNext()) {
-                                  final RowRecord record = dataSet.next();
-                                  if 
(!insertRowRecordEnrichedByConsumerGroupId(
-                                      columnNameList, record.getTimestamp(), 
consumerGroupId)) {
-                                    receiverCrashed.set(true);
-                                    throw new RuntimeException("detect 
receiver crashed");
-                                  }
+                      if 
(!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+                        LOGGER.warn("unexpected message type: {}", 
messageType);
+                        continue;
+                      }
+                      switch (SubscriptionMessageType.valueOf(messageType)) {
+                        case SESSION_DATA_SETS_HANDLER:
+                          {
+                            for (final SubscriptionSessionDataSet dataSet :
+                                message.getSessionDataSetsHandler()) {
+                              final List<String> columnNameList = 
dataSet.getColumnNames();
+                              while (dataSet.hasNext()) {
+                                final RowRecord record = dataSet.next();
+                                if (!insertRowRecordEnrichedByConsumerGroupId(
+                                    columnNameList, record.getTimestamp(), 
consumerGroupId)) {
+                                  receiverCrashed.set(true);
+                                  throw new RuntimeException("detect receiver 
crashed");
                                 }
                               }
-                              break;
                             }
-                          case TS_FILE_HANDLER:
-                            {
-                              try (final TsFileReader tsFileReader =
-                                  message.getTsFileHandler().openReader()) {
-                                final QueryDataSet dataSet =
-                                    tsFileReader.query(
-                                        QueryExpression.create(
-                                            Arrays.asList(
-                                                new Path("root.topic1", "s", 
true),
-                                                new Path("root.topic2", "s", 
true)),
-                                            null));
-                                while (dataSet.hasNext()) {
-                                  final RowRecord record = dataSet.next();
-                                  for (final Path path : dataSet.getPaths()) {
-                                    if 
(!insertRowRecordEnrichedByConsumerGroupId(
-                                        path.toString(), 
record.getTimestamp(), consumerGroupId)) {
-                                      receiverCrashed.set(true);
-                                      throw new RuntimeException("detect 
receiver crashed");
-                                    }
+                            break;
+                          }
+                        case TS_FILE_HANDLER:
+                          {
+                            try (final TsFileReader tsFileReader =
+                                message.getTsFileHandler().openReader()) {
+                              final QueryDataSet dataSet =
+                                  tsFileReader.query(
+                                      QueryExpression.create(
+                                          Arrays.asList(
+                                              new Path("root.topic1", "s", 
true),
+                                              new Path("root.topic2", "s", 
true)),
+                                          null));
+                              while (dataSet.hasNext()) {
+                                final RowRecord record = dataSet.next();
+                                for (final Path path : dataSet.getPaths()) {
+                                  if 
(!insertRowRecordEnrichedByConsumerGroupId(
+                                      path.toString(), record.getTimestamp(), 
consumerGroupId)) {
+                                    receiverCrashed.set(true);
+                                    throw new RuntimeException("detect 
receiver crashed");
                                   }
                                 }
                               }
-                              break;
                             }
-                          default:
-                            LOGGER.warn("unexpected message type: {}", 
messageType);
                             break;
-                        }
-                      } else {
-                        LOGGER.warn("unexpected message type: {}", 
messageType);
+                          }
+                        default:
+                          LOGGER.warn("unexpected message type: {}", 
messageType);
+                          break;
                       }
                     }
                     consumer.commitSync(messages);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 2f9cb124311..04a1f33772f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -33,6 +33,9 @@ import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 
 import org.apache.tsfile.read.TsFileReader;
@@ -84,7 +87,16 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   }
 
   @Test
-  public void testTopicPathSubscription() throws Exception {
+  public void testTabletTopicWithPath() throws Exception {
+    
testTopicWithPathTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+  }
+
+  @Test
+  public void testTsFileTopicWithPath() throws Exception {
+    testTopicWithPathTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+  }
+
+  private void testTopicWithPathTemplate(final String topicFormat) throws 
Exception {
     // Insert some historical data on sender
     try (final ISession session = senderEnv.getSessionConnection()) {
       for (int i = 0; i < 100; ++i) {
@@ -110,6 +122,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
       config.put(TopicConstant.PATH_KEY, "root.db.*.s");
       session.createTopic(topicName, config);
     } catch (final Exception e) {
@@ -138,14 +151,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
                   final List<SubscriptionMessage> messages =
                       
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
-                  for (final SubscriptionMessage message : messages) {
-                    for (final Iterator<Tablet> it =
-                            
message.getSessionDataSetsHandler().tabletIterator();
-                        it.hasNext(); ) {
-                      final Tablet tablet = it.next();
-                      session.insertTablet(tablet);
-                    }
-                  }
+                  insertData(messages, session);
                   consumer.commitSync(messages);
                 }
                 consumer.unsubscribe(topicName);
@@ -185,7 +191,16 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   }
 
   @Test
-  public void testTopicTimeSubscription() throws Exception {
+  public void testTabletTopicWithTime() throws Exception {
+    
testTopicWithTimeTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+  }
+
+  @Test
+  public void testTsFileTopicWithTime() throws Exception {
+    testTopicWithTimeTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+  }
+
+  private void testTopicWithTimeTemplate(final String topicFormat) throws 
Exception {
     // Insert some historical data on sender
     final long currentTime = System.currentTimeMillis();
     try (final ISession session = senderEnv.getSessionConnection()) {
@@ -208,6 +223,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
       config.put(TopicConstant.START_TIME_KEY, currentTime);
       session.createTopic(topicName, config);
     } catch (final Exception e) {
@@ -236,14 +252,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
                   final List<SubscriptionMessage> messages =
                       
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
-                  for (final SubscriptionMessage message : messages) {
-                    for (final Iterator<Tablet> it =
-                            
message.getSessionDataSetsHandler().tabletIterator();
-                        it.hasNext(); ) {
-                      final Tablet tablet = it.next();
-                      session.insertTablet(tablet);
-                    }
-                  }
+                  insertData(messages, session);
                   consumer.commitSync(messages);
                 }
                 consumer.unsubscribe(topicName);
@@ -282,7 +291,16 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   }
 
   @Test
-  public void testTopicProcessorSubscription() throws Exception {
+  public void testTabletTopicWithProcessor() throws Exception {
+    
testTopicWithProcessorTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+  }
+
+  @Test
+  public void testTsFileTopicWithProcessor() throws Exception {
+    testTopicWithProcessorTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+  }
+
+  private void testTopicWithProcessorTemplate(final String topicFormat) throws 
Exception {
     // Insert some history data on sender
     try (final ISession session = senderEnv.getSessionConnection()) {
       session.executeNonQueryStatement(
@@ -300,6 +318,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
       config.put("processor", "tumbling-time-sampling-processor");
       config.put("processor.tumbling-time.interval-seconds", "1");
       config.put("processor.down-sampling.split-file", "true");
@@ -330,14 +349,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
                   final List<SubscriptionMessage> messages =
                       
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
-                  for (final SubscriptionMessage message : messages) {
-                    for (final Iterator<Tablet> it =
-                            
message.getSessionDataSetsHandler().tabletIterator();
-                        it.hasNext(); ) {
-                      final Tablet tablet = it.next();
-                      session.insertTablet(tablet);
-                    }
-                  }
+                  insertData(messages, session);
                   consumer.commitSync(messages);
                 }
                 consumer.unsubscribe(topicName);
@@ -500,7 +512,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   }
 
   @Test
-  public void testTopicInvalidTimeRangeConfig() throws Exception {
+  public void testTopicWithInvalidTimeConfig() throws Exception {
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
 
@@ -530,7 +542,16 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   }
 
   @Test
-  public void testTopicWithSnapshotMode() throws Exception {
+  public void testTabletTopicWithSnapshotMode() throws Exception {
+    
testTopicWithSnapshotModeTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+  }
+
+  @Test
+  public void testTsFileTopicWithSnapshotMode() throws Exception {
+    
testTopicWithSnapshotModeTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+  }
+
+  private void testTopicWithSnapshotModeTemplate(final String topicFormat) 
throws Exception {
     // Insert some historical data
     try (final ISession session = senderEnv.getSessionConnection()) {
       for (int i = 0; i < 100; ++i) {
@@ -544,13 +565,13 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
 
     // Create topic
-    final String topicName = "topic11";
+    final String topicName = "topic9";
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       final Properties config = new Properties();
-      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
       config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
       session.createTopic(topicName, config);
     } catch (final Exception e) {
@@ -580,16 +601,37 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   final List<SubscriptionMessage> messages =
                       
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
                   for (final SubscriptionMessage message : messages) {
-                    try (final TsFileReader tsFileReader =
-                        message.getTsFileHandler().openReader()) {
-                      final Path path = new Path("root.db.d1", "s1", true);
-                      final QueryDataSet dataSet =
-                          tsFileReader.query(
-                              
QueryExpression.create(Collections.singletonList(path), null));
-                      while (dataSet.hasNext()) {
-                        dataSet.next();
-                        rowCount.addAndGet(1);
-                      }
+                    final short messageType = message.getMessageType();
+                    if 
(!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+                      LOGGER.warn("unexpected message type: {}", messageType);
+                      continue;
+                    }
+                    switch (SubscriptionMessageType.valueOf(messageType)) {
+                      case SESSION_DATA_SETS_HANDLER:
+                        for (final SubscriptionSessionDataSet dataSet :
+                            message.getSessionDataSetsHandler()) {
+                          while (dataSet.hasNext()) {
+                            dataSet.next();
+                            rowCount.addAndGet(1);
+                          }
+                        }
+                        break;
+                      case TS_FILE_HANDLER:
+                        try (final TsFileReader tsFileReader =
+                            message.getTsFileHandler().openReader()) {
+                          final Path path = new Path("root.db.d1", "s1", true);
+                          final QueryDataSet dataSet =
+                              tsFileReader.query(
+                                  
QueryExpression.create(Collections.singletonList(path), null));
+                          while (dataSet.hasNext()) {
+                            dataSet.next();
+                            rowCount.addAndGet(1);
+                          }
+                        }
+                        break;
+                      default:
+                        LOGGER.warn("unexpected message type: {}", 
messageType);
+                        break;
                     }
                   }
                   consumer.commitSync(messages);
@@ -633,7 +675,16 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   }
 
   @Test
-  public void testTopicWithLooseRange() throws Exception {
+  public void testTabletTopicWithLooseRange() throws Exception {
+    
testTopicWithLooseRangeTemplate(TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+  }
+
+  @Test
+  public void testTsFileTopicWithLooseRange() throws Exception {
+    
testTopicWithLooseRangeTemplate(TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+  }
+
+  private void testTopicWithLooseRangeTemplate(final String topicFormat) 
throws Exception {
     // Insert some historical data on sender
     try (final ISession session = senderEnv.getSessionConnection()) {
       session.executeNonQueryStatement(
@@ -649,12 +700,13 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
 
     // Create topic
-    final String topicName = "topic12";
+    final String topicName = "topic10";
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
       config.put(TopicConstant.LOOSE_RANGE_KEY, 
TopicConstant.LOOSE_RANGE_ALL_VALUE);
       config.put(TopicConstant.PATH_KEY, "root.db.d1.at1");
       config.put(TopicConstant.START_TIME_KEY, "1500");
@@ -692,14 +744,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   if (dataPrepared.get()) {
                     final List<SubscriptionMessage> messages =
                         
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
-                    for (final SubscriptionMessage message : messages) {
-                      for (final Iterator<Tablet> it =
-                              
message.getSessionDataSetsHandler().tabletIterator();
-                          it.hasNext(); ) {
-                        final Tablet tablet = it.next();
-                        session.insertTablet(tablet);
-                      }
-                    }
+                    insertData(messages, session);
                     consumer.commitSync(messages);
                   }
                 }
@@ -762,6 +807,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
   }
 
+  /////////////////////////////// utility ///////////////////////////////
+
   private void assertTopicCount(final int count) throws Exception {
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
@@ -770,4 +817,32 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       Assert.assertEquals(count, showTopicResult.size());
     }
   }
+
+  private void insertData(final List<SubscriptionMessage> messages, final 
ISession session)
+      throws Exception {
+    for (final SubscriptionMessage message : messages) {
+      final short messageType = message.getMessageType();
+      if (!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+        LOGGER.warn("unexpected message type: {}", messageType);
+        continue;
+      }
+      switch (SubscriptionMessageType.valueOf(messageType)) {
+        case SESSION_DATA_SETS_HANDLER:
+          for (final Iterator<Tablet> it = 
message.getSessionDataSetsHandler().tabletIterator();
+              it.hasNext(); ) {
+            final Tablet tablet = it.next();
+            session.insertTablet(tablet);
+          }
+          break;
+        case TS_FILE_HANDLER:
+          final SubscriptionTsFileHandler tsFileHandler = 
message.getTsFileHandler();
+          session.executeNonQueryStatement(
+              String.format("load '%s'", 
tsFileHandler.getFile().getAbsolutePath()));
+          break;
+        default:
+          LOGGER.warn("unexpected message type: {}", messageType);
+          break;
+      }
+    }
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index e9c97c06896..83c5c9d1c85 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -30,21 +30,10 @@ import 
org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
 import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
-import org.apache.iotdb.session.subscription.payload.SubscriptionFileHandler;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
-import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 
-import org.apache.tsfile.common.conf.TSFileConfig;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.read.TsFileReader;
-import org.apache.tsfile.read.common.Path;
-import org.apache.tsfile.read.common.RowRecord;
-import org.apache.tsfile.read.expression.QueryExpression;
-import org.apache.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.write.record.Tablet;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -52,12 +41,7 @@ import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
 import java.time.Duration;
-import java.time.LocalDate;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -75,282 +59,6 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
 
-  @Test
-  public void testSubscribeBooleanData() throws Exception {
-    testBasicSubscribeTablets(TSDataType.BOOLEAN, "true", true);
-  }
-
-  @Test
-  public void testSubscribeIntData() throws Exception {
-    testBasicSubscribeTablets(TSDataType.INT32, "1", 1);
-  }
-
-  @Test
-  public void testSubscribeLongData() throws Exception {
-    testBasicSubscribeTablets(TSDataType.INT64, "1", 1L);
-  }
-
-  @Test
-  public void testSubscribeFloatData() throws Exception {
-    testBasicSubscribeTablets(TSDataType.FLOAT, "1.0", 1.0F);
-  }
-
-  @Test
-  public void testSubscribeDoubleData() throws Exception {
-    testBasicSubscribeTablets(TSDataType.DOUBLE, "1.0", 1.0);
-  }
-
-  @Test
-  public void testSubscribeTextData() throws Exception {
-    testBasicSubscribeTablets(TSDataType.TEXT, "'a'", new Binary("a", 
TSFileConfig.STRING_CHARSET));
-  }
-
-  @Test
-  public void testSubscribeTimestampData() throws Exception {
-    testBasicSubscribeTablets(TSDataType.TIMESTAMP, "123", 123L);
-  }
-
-  @Test
-  public void testSubscribeDateData() throws Exception {
-    testBasicSubscribeTablets(TSDataType.DATE, "'2011-03-01'", 
LocalDate.of(2011, 3, 1));
-  }
-
-  @Test
-  public void testSubscribeBlobData() throws Exception {
-    testBasicSubscribeTablets(
-        TSDataType.BLOB, "X'f013'", new Binary(new byte[] {(byte) 0xf0, 
0x13}));
-  }
-
-  @Test
-  public void testSubscribeStringData() throws Exception {
-    testBasicSubscribeTablets(
-        TSDataType.STRING, "'a'", new Binary("a", 
TSFileConfig.STRING_CHARSET));
-  }
-
-  private void testBasicSubscribeTablets(
-      final TSDataType type, final String valueStr, final Object expectedData) 
throws Exception {
-    // Insert some historical data
-    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      session.executeNonQueryStatement(
-          String.format("create timeseries root.db.d1.s1 %s", 
type.toString()));
-      for (int i = 0; i < 100; ++i) {
-        session.executeNonQueryStatement(
-            String.format("insert into root.db.d1(time, s1) values (%s, %s)", 
i, valueStr));
-      }
-      session.executeNonQueryStatement("flush");
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-
-    // Create topic
-    final String topicName = "topic1";
-    final String host = EnvFactory.getEnv().getIP();
-    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
-    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
-      session.open();
-      session.createTopic(topicName);
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-
-    // Subscription
-    final AtomicInteger rowCount = new AtomicInteger();
-    final AtomicBoolean isClosed = new AtomicBoolean(false);
-    final Thread thread =
-        new Thread(
-            () -> {
-              try (final SubscriptionPullConsumer consumer =
-                  new SubscriptionPullConsumer.Builder()
-                      .host(host)
-                      .port(port)
-                      .consumerId("c1")
-                      .consumerGroupId("cg1")
-                      .autoCommit(false)
-                      .buildPullConsumer()) {
-                consumer.open();
-                consumer.subscribe(topicName);
-                while (!isClosed.get()) {
-                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
-                  final List<SubscriptionMessage> messages =
-                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
-                  for (final SubscriptionMessage message : messages) {
-                    for (final SubscriptionSessionDataSet dataSet :
-                        message.getSessionDataSetsHandler()) {
-                      while (dataSet.hasNext()) {
-                        final RowRecord record = dataSet.next();
-                        Assert.assertEquals(type.toString(), 
dataSet.getColumnTypes().get(1));
-                        Assert.assertEquals(type, 
record.getFields().get(0).getDataType());
-                        Assert.assertEquals(expectedData, getValue(type, 
dataSet.getTablet()));
-                        Assert.assertEquals(
-                            expectedData, 
record.getFields().get(0).getObjectValue(type));
-                        rowCount.addAndGet(1);
-                      }
-                    }
-                  }
-                  consumer.commitSync(messages);
-                }
-                consumer.unsubscribe(topicName);
-              } catch (final Exception e) {
-                e.printStackTrace();
-                // Avoid failure
-              } finally {
-                LOGGER.info("consumer exiting...");
-              }
-            },
-            String.format("%s - consumer", testName.getMethodName()));
-    thread.start();
-
-    // Check row count
-    try {
-      // Keep retrying if there are execution failures
-      AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    } finally {
-      isClosed.set(true);
-      thread.join();
-    }
-  }
-
-  private Object getValue(final TSDataType type, final Tablet tablet) {
-    switch (type) {
-      case BOOLEAN:
-        return ((boolean[]) tablet.values[0])[0];
-      case INT32:
-        return ((int[]) tablet.values[0])[0];
-      case INT64:
-      case TIMESTAMP:
-        return ((long[]) tablet.values[0])[0];
-      case FLOAT:
-        return ((float[]) tablet.values[0])[0];
-      case DOUBLE:
-        return ((double[]) tablet.values[0])[0];
-      case TEXT:
-      case BLOB:
-      case STRING:
-        return ((Binary[]) tablet.values[0])[0];
-      case DATE:
-        return ((LocalDate[]) tablet.values[0])[0];
-      default:
-        return null;
-    }
-  }
-
-  @Test
-  public void testBasicSubscribeTsFile() throws Exception {
-    // Insert some historical data
-    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      for (int i = 0; i < 100; ++i) {
-        session.executeNonQueryStatement(
-            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
-      }
-      session.executeNonQueryStatement("flush");
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-
-    // Create topic
-    final String topicName = "topic1";
-    final String host = EnvFactory.getEnv().getIP();
-    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
-    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
-      session.open();
-      final Properties config = new Properties();
-      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
-      session.createTopic(topicName, config);
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-
-    // Record file handlers
-    final List<SubscriptionFileHandler> fileHandlers = new ArrayList<>();
-
-    // Subscription
-    final AtomicInteger rowCount = new AtomicInteger();
-    final AtomicBoolean isClosed = new AtomicBoolean(false);
-    final Thread thread =
-        new Thread(
-            () -> {
-              try (final SubscriptionPullConsumer consumer =
-                  new SubscriptionPullConsumer.Builder()
-                      .host(host)
-                      .port(port)
-                      .consumerId("c1")
-                      .consumerGroupId("cg1")
-                      .autoCommit(false)
-                      .fileSaveDir(System.getProperty("java.io.tmpdir")) // 
hack for license check
-                      .buildPullConsumer()) {
-                consumer.open();
-                consumer.subscribe(topicName);
-                while (!isClosed.get()) {
-                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
-                  final List<SubscriptionMessage> messages =
-                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
-                  for (final SubscriptionMessage message : messages) {
-                    final SubscriptionTsFileHandler tsFileHandler = 
message.getTsFileHandler();
-                    fileHandlers.add(tsFileHandler);
-                    try (final TsFileReader tsFileReader = 
tsFileHandler.openReader()) {
-                      final Path path = new Path("root.db.d1", "s1", true);
-                      final QueryDataSet dataSet =
-                          tsFileReader.query(
-                              
QueryExpression.create(Collections.singletonList(path), null));
-                      while (dataSet.hasNext()) {
-                        dataSet.next();
-                        rowCount.addAndGet(1);
-                      }
-                    }
-                  }
-                  consumer.commitSync(messages);
-                }
-                consumer.unsubscribe(topicName);
-              } catch (final Exception e) {
-                e.printStackTrace();
-                // Avoid failure
-              } finally {
-                LOGGER.info("consumer exiting...");
-              }
-            },
-            String.format("%s - consumer", testName.getMethodName()));
-    thread.start();
-
-    // Check row count
-    try {
-      // Keep retrying if there are execution failures
-      AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
-    } catch (final Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    } finally {
-      isClosed.set(true);
-      thread.join();
-    }
-
-    // Do something for file handlers
-    Assert.assertFalse(fileHandlers.isEmpty());
-    final SubscriptionFileHandler fileHandler = fileHandlers.get(0);
-    final java.nio.file.Path filePath = fileHandler.getPath();
-    Assert.assertTrue(Files.exists(filePath));
-
-    // Copy file
-    java.nio.file.Path tmpFilePath;
-    tmpFilePath = fileHandler.copyFile(Files.createTempFile(null, 
null).toAbsolutePath());
-    Assert.assertTrue(Files.exists(filePath));
-    Assert.assertTrue(Files.exists(tmpFilePath));
-
-    // Move file
-    tmpFilePath = fileHandler.moveFile(Files.createTempFile(null, 
null).toAbsolutePath());
-    Assert.assertFalse(Files.exists(filePath));
-    Assert.assertTrue(Files.exists(tmpFilePath));
-
-    // Delete file
-    Assert.assertThrows(NoSuchFileException.class, fileHandler::deleteFile);
-  }
-
   @Test
   public void testBasicPullConsumerWithCommitAsync() throws Exception {
     // Insert some historical data
@@ -511,7 +219,7 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
     }
 
     // Create topic
-    final String topicName = "topic1";
+    final String topicName = "topic2";
     final String host = EnvFactory.getEnv().getIP();
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
@@ -610,6 +318,8 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
     }
 
     // Create topic
+    final String topicName3 = "topic3";
+    final String topicName4 = "topic4";
     final String host = EnvFactory.getEnv().getIP();
     final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
@@ -617,12 +327,12 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
       {
         final Properties properties = new Properties();
         properties.put(TopicConstant.END_TIME_KEY, 99);
-        session.createTopic("topic1", properties);
+        session.createTopic(topicName3, properties);
       }
       {
         final Properties properties = new Properties();
         properties.put(TopicConstant.START_TIME_KEY, 100);
-        session.createTopic("topic2", properties);
+        session.createTopic(topicName4, properties);
       }
     } catch (final Exception e) {
       e.printStackTrace();
@@ -645,7 +355,7 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
                       .autoCommit(false)
                       .buildPullConsumer()) {
                 consumer.open();
-                consumer.subscribe("topic2"); // only subscribe topic2
+                consumer.subscribe(topicName4); // only subscribe topic4
                 while (!isClosed.get()) {
                   LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
                   final List<SubscriptionMessage> messages =
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionDataTypeIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionDataTypeIT.java
new file mode 100644
index 00000000000..413540a89c1
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionDataTypeIT.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.common.RowRecord;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDate;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSubscriptionDataTypeIT extends AbstractSubscriptionLocalIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionDataTypeIT.class);
+
+  // ----------------------------- //
+  // SessionDataSetsHandler format //
+  // ----------------------------- //
+
+  @Test
+  public void testSubscribeTabletBooleanData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE, 
TSDataType.BOOLEAN, "true", true);
+  }
+
+  @Test
+  public void testSubscribeTabletIntData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE, 
TSDataType.INT32, "1", 1);
+  }
+
+  @Test
+  public void testSubscribeTabletLongData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE, 
TSDataType.INT64, "1", 1L);
+  }
+
+  @Test
+  public void testSubscribeTabletFloatData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE, 
TSDataType.FLOAT, "1.0", 1.0F);
+  }
+
+  @Test
+  public void testSubscribeTabletDoubleData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE, 
TSDataType.DOUBLE, "1.0", 1.0);
+  }
+
+  @Test
+  public void testSubscribeTabletTextData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
+        TSDataType.TEXT,
+        "'a'",
+        new Binary("a", TSFileConfig.STRING_CHARSET));
+  }
+
+  @Test
+  public void testSubscribeTabletTimestampData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE, 
TSDataType.TIMESTAMP, "123", 123L);
+  }
+
+  @Test
+  public void testSubscribeTabletDateData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
+        TSDataType.DATE,
+        "'2011-03-01'",
+        LocalDate.of(2011, 3, 1));
+  }
+
+  @Test
+  public void testSubscribeBlobData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
+        TSDataType.BLOB,
+        "X'f013'",
+        new Binary(new byte[] {(byte) 0xf0, 0x13}));
+  }
+
+  @Test
+  public void testSubscribeStringData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE,
+        TSDataType.STRING,
+        "'a'",
+        new Binary("a", TSFileConfig.STRING_CHARSET));
+  }
+
+  // -------------------- //
+  // TsFileHandler format //
+  // -------------------- //
+
+  @Test
+  public void testSubscribeTsFileBooleanData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.BOOLEAN, 
"true", true);
+  }
+
+  @Test
+  public void testSubscribeTsFileIntData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.INT32, "1", 1);
+  }
+
+  @Test
+  public void testSubscribeTsFileLongData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.INT64, "1", 1L);
+  }
+
+  @Test
+  public void testSubscribeTsFileFloatData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.FLOAT, "1.0", 
1.0F);
+  }
+
+  @Test
+  public void testSubscribeTsFileDoubleData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.DOUBLE, "1.0", 
1.0);
+  }
+
+  @Test
+  public void testSubscribeTsFileTextData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE,
+        TSDataType.TEXT,
+        "'a'",
+        new Binary("a", TSFileConfig.STRING_CHARSET));
+  }
+
+  @Test
+  public void testSubscribeTsFileTimestampData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE, TSDataType.TIMESTAMP, 
"123", 123L);
+  }
+
+  @Test
+  public void testSubscribeTsFileDateData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE,
+        TSDataType.DATE,
+        "'2011-03-01'",
+        LocalDate.of(2011, 3, 1));
+  }
+
+  @Test
+  public void testSubscribeTsFileBlobData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE,
+        TSDataType.BLOB,
+        "X'f013'",
+        new Binary(new byte[] {(byte) 0xf0, 0x13}));
+  }
+
+  @Test
+  public void testSubscribeTsFileStringData() throws Exception {
+    testPullConsumerSubscribeDataTemplate(
+        TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE,
+        TSDataType.STRING,
+        "'a'",
+        new Binary("a", TSFileConfig.STRING_CHARSET));
+  }
+
+  /////////////////////////////// utility ///////////////////////////////
+
+  private void testPullConsumerSubscribeDataTemplate(
+      final String topicFormat,
+      final TSDataType type,
+      final String valueStr,
+      final Object expectedData)
+      throws Exception {
+    // Insert some historical data
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      session.executeNonQueryStatement(
+          String.format("create timeseries root.db.d1.s1 %s", 
type.toString()));
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, %s)", 
i, valueStr));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic1";
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, topicFormat);
+      session.createTopic(topicName, config);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final AtomicInteger rowCount = new AtomicInteger();
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                  new SubscriptionPullConsumer.Builder()
+                      .host(host)
+                      .port(port)
+                      .consumerId("c1")
+                      .consumerGroupId("cg1")
+                      .fileSaveDir(System.getProperty("java.io.tmpdir")) // 
hack for license check
+                      .autoCommit(false)
+                      .buildPullConsumer()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+                while (!isClosed.get()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  final List<SubscriptionMessage> messages =
+                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                  for (final SubscriptionMessage message : messages) {
+                    final short messageType = message.getMessageType();
+                    if 
(!SubscriptionMessageType.isValidatedMessageType(messageType)) {
+                      LOGGER.warn("unexpected message type: {}", messageType);
+                      continue;
+                    }
+                    switch (SubscriptionMessageType.valueOf(messageType)) {
+                      case SESSION_DATA_SETS_HANDLER:
+                        for (final SubscriptionSessionDataSet dataSet :
+                            message.getSessionDataSetsHandler()) {
+                          while (dataSet.hasNext()) {
+                            final RowRecord record = dataSet.next();
+                            Assert.assertEquals(type.toString(), 
dataSet.getColumnTypes().get(1));
+                            Assert.assertEquals(type, 
record.getFields().get(0).getDataType());
+                            Assert.assertEquals(expectedData, getValue(type, 
dataSet.getTablet()));
+                            Assert.assertEquals(
+                                expectedData, 
record.getFields().get(0).getObjectValue(type));
+                            rowCount.addAndGet(1);
+                          }
+                        }
+                        break;
+                      case TS_FILE_HANDLER:
+                        try (final TsFileReader tsFileReader =
+                            message.getTsFileHandler().openReader()) {
+                          final QueryDataSet dataSet =
+                              tsFileReader.query(
+                                  QueryExpression.create(
+                                      Collections.singletonList(new 
Path("root.db.d1", "s1", true)),
+                                      null));
+                          while (dataSet.hasNext()) {
+                            final RowRecord record = dataSet.next();
+                            Assert.assertEquals(type, 
record.getFields().get(0).getDataType());
+                            Assert.assertEquals(
+                                expectedData, 
record.getFields().get(0).getObjectValue(type));
+                            rowCount.addAndGet(1);
+                          }
+                        }
+                        break;
+                      default:
+                        LOGGER.warn("unexpected message type: {}", 
messageType);
+                        break;
+                    }
+                  }
+                  consumer.commitSync(messages);
+                }
+                consumer.unsubscribe(topicName);
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getMethodName()));
+    thread.start();
+
+    // Check row count
+    try {
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
+
+  private Object getValue(final TSDataType type, final Tablet tablet) {
+    switch (type) {
+      case BOOLEAN:
+        return ((boolean[]) tablet.values[0])[0];
+      case INT32:
+        return ((int[]) tablet.values[0])[0];
+      case INT64:
+      case TIMESTAMP:
+        return ((long[]) tablet.values[0])[0];
+      case FLOAT:
+        return ((float[]) tablet.values[0])[0];
+      case DOUBLE:
+        return ((double[]) tablet.values[0])[0];
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return ((Binary[]) tablet.values[0])[0];
+      case DATE:
+        return ((LocalDate[]) tablet.values[0])[0];
+      default:
+        return null;
+    }
+  }
+}
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index d54ed406774..1dad01aca00 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
 import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
 import org.apache.iotdb.session.subscription.util.IdentifierUtils;
 import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
 import org.apache.iotdb.session.subscription.util.SubscriptionPollTimer;
@@ -209,6 +210,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
 
   public synchronized void open() throws SubscriptionException {
     checkIfHasBeenClosed();
+
     if (!isClosed.get()) {
       return;
     }
@@ -495,7 +497,11 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     final File file = filePath.toFile();
     try (final RandomAccessFile fileWriter = new RandomAccessFile(file, "rw")) 
{
       return Optional.of(pollFileInternal(commitContext, file, fileWriter));
-    } catch (final IOException e) {
+    } catch (final Exception e) {
+      // construct temporary message to nack
+      nack(
+          Collections.singletonList(
+              new SubscriptionMessage(commitContext, file.getAbsolutePath())));
       throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
     }
   }
@@ -740,6 +746,14 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     final Map<Integer, List<SubscriptionCommitContext>> 
dataNodeIdToSubscriptionCommitContexts =
         new HashMap<>();
     for (final SubscriptionMessage message : messages) {
+      // make every effort to delete stale intermediate file
+      if (Objects.equals(
+          SubscriptionMessageType.TS_FILE_HANDLER.getType(), 
message.getMessageType())) {
+        try {
+          message.getTsFileHandler().deleteFile();
+        } catch (final Exception ignored) {
+        }
+      }
       dataNodeIdToSubscriptionCommitContexts
           .computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> 
new ArrayList<>())
           .add(message.getCommitContext());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 21064ac34c8..f47f7cba0b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -94,7 +94,8 @@ public class SubscriptionBrokerAgent {
       LOGGER.warn(errorMessage);
       throw new SubscriptionException(errorMessage);
     }
-    return broker.commit(commitContexts, nack);
+    final String consumerId = consumerConfig.getConsumerId();
+    return broker.commit(consumerId, commitContexts, nack);
   }
 
   /////////////////////////////// broker ///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index b4b95895840..45dbeda8d5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -122,7 +122,9 @@ public class SubscriptionBroker {
    * @return list of successful commit contexts
    */
   public List<SubscriptionCommitContext> commit(
-      final List<SubscriptionCommitContext> commitContexts, final boolean 
nack) {
+      final String consumerId,
+      final List<SubscriptionCommitContext> commitContexts,
+      final boolean nack) {
     final List<SubscriptionCommitContext> successfulCommitContexts = new 
ArrayList<>();
     for (final SubscriptionCommitContext commitContext : commitContexts) {
       final String topicName = commitContext.getTopicName();
@@ -138,11 +140,11 @@ public class SubscriptionBroker {
         continue;
       }
       if (!nack) {
-        if (prefetchingQueue.ack(commitContext)) {
+        if (prefetchingQueue.ack(consumerId, commitContext)) {
           successfulCommitContexts.add(commitContext);
         }
       } else {
-        if (prefetchingQueue.nack(commitContext)) {
+        if (prefetchingQueue.nack(consumerId, commitContext)) {
           successfulCommitContexts.add(commitContext);
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index df06665f162..e4b1c0d8711 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -77,6 +77,21 @@ public abstract class SubscriptionPrefetchingQueue {
     this.uncommittedEvents = new ConcurrentHashMap<>();
   }
 
+  public void cleanup() {
+    // clean up uncommitted events
+    uncommittedEvents.values().forEach(SubscriptionEvent::cleanup);
+    uncommittedEvents.clear();
+
+    // no need to clean up events in prefetchingQueue, since all events in 
prefetchingQueue are also
+    // in uncommittedEvents
+    prefetchingQueue.clear();
+
+    // no need to clean up events in inputPendingQueue, see
+    // 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.close
+  }
+
+  /////////////////////////////// poll ///////////////////////////////
+
   public SubscriptionEvent poll(final String consumerId) {
     if (prefetchingQueue.isEmpty()) {
       tryPrefetch();
@@ -121,23 +136,12 @@ public abstract class SubscriptionPrefetchingQueue {
     return null;
   }
 
-  /**
-   * @return {@code true} if a new event has been prefetched.
-   */
-  protected abstract boolean onEvent(final TabletInsertionEvent event);
-
-  /**
-   * @return {@code true} if a new event has been prefetched.
-   */
-  protected abstract boolean onEvent(final PipeTsFileInsertionEvent event);
+  /////////////////////////////// prefetch ///////////////////////////////
 
-  /**
-   * @return {@code true} if a new event has been prefetched.
-   */
-  protected abstract boolean trySealBatch();
+  public abstract void executePrefetch();
 
   /**
-   * prefetch at most one subscription event from {@link
+   * prefetch at most one {@link SubscriptionEvent} from {@link
    * SubscriptionPrefetchingQueue#inputPendingQueue} to {@link
    * SubscriptionPrefetchingQueue#prefetchingQueue}
    */
@@ -188,27 +192,27 @@ public abstract class SubscriptionPrefetchingQueue {
     }
   }
 
-  public abstract void executePrefetch();
-
-  public void cleanup() {
-    // clean up uncommitted events
-    uncommittedEvents.values().forEach(SubscriptionEvent::cleanup);
-    uncommittedEvents.clear();
+  /**
+   * @return {@code true} if a new event has been prefetched.
+   */
+  protected abstract boolean onEvent(final TabletInsertionEvent event);
 
-    // no need to clean up events in prefetchingQueue, since all events in 
prefetchingQueue are also
-    // in uncommittedEvents
-    prefetchingQueue.clear();
+  /**
+   * @return {@code true} if a new event has been prefetched.
+   */
+  protected abstract boolean onEvent(final PipeTsFileInsertionEvent event);
 
-    // no need to clean up events in inputPendingQueue, see
-    // 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask.close
-  }
+  /**
+   * @return {@code true} if a new event has been prefetched.
+   */
+  protected abstract boolean trySealBatch();
 
   /////////////////////////////// commit ///////////////////////////////
 
   /**
    * @return {@code true} if ack successfully
    */
-  public boolean ack(final SubscriptionCommitContext commitContext) {
+  public boolean ack(final String consumerId, final SubscriptionCommitContext 
commitContext) {
     final SubscriptionEvent event = uncommittedEvents.get(commitContext);
     if (Objects.isNull(event)) {
       LOGGER.warn(
@@ -236,9 +240,20 @@ public abstract class SubscriptionPrefetchingQueue {
       return false;
     }
 
+    // check if a consumer acks event from another consumer group...
+    final String consumerGroupId = commitContext.getConsumerGroupId();
+    if (!Objects.equals(consumerGroupId, brokerId)) {
+      LOGGER.warn(
+          "inconsistent consumer group when acking event, current: {}, 
incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {}, 
commit it anyway...",
+          brokerId,
+          consumerGroupId,
+          consumerId,
+          commitContext,
+          this);
+    }
+
     event.ack();
     event.cleanup();
-
     event.recordCommittedTimestamp();
     uncommittedEvents.remove(commitContext);
     return true;
@@ -247,7 +262,7 @@ public abstract class SubscriptionPrefetchingQueue {
   /**
    * @return {@code true} if nack successfully
    */
-  public boolean nack(final SubscriptionCommitContext commitContext) {
+  public boolean nack(final String consumerId, final SubscriptionCommitContext 
commitContext) {
     final SubscriptionEvent event = uncommittedEvents.get(commitContext);
     if (Objects.isNull(event)) {
       LOGGER.warn(
@@ -256,6 +271,19 @@ public abstract class SubscriptionPrefetchingQueue {
           this);
       return false;
     }
+
+    // check if a consumer nacks event from another consumer group...
+    final String consumerGroupId = commitContext.getConsumerGroupId();
+    if (!Objects.equals(consumerGroupId, brokerId)) {
+      LOGGER.warn(
+          "inconsistent consumer group when nacking event, current: {}, 
incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {}, 
commit it anyway...",
+          brokerId,
+          consumerGroupId,
+          consumerId,
+          commitContext,
+          this);
+    }
+
     event.nack();
     return true;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 7bd42904d15..900bc4577fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -81,10 +81,12 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
         });
   }
 
+  /////////////////////////////// prefetch ///////////////////////////////
+
   @Override
   public void executePrefetch() {
-    tryPrefetch();
-    serializeEventsInQueue();
+    super.tryPrefetch();
+    this.serializeEventsInQueue();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 43aab7f8189..40c8980d44f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -93,6 +93,8 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
         });
   }
 
+  /////////////////////////////// poll ///////////////////////////////
+
   @Override
   public SubscriptionEvent poll(final String consumerId) {
     if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) {
@@ -257,6 +259,22 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     return event;
   }
 
+  /////////////////////////////// prefetch ///////////////////////////////
+
+  @Override
+  public synchronized void executePrefetch() {
+    super.tryPrefetch();
+
+    // prefetch remaining subscription events based on {@link 
consumerIdToSubscriptionEventMap}
+    for (final SubscriptionEvent event : 
consumerIdToSubscriptionEventMap.values()) {
+      try {
+        event.prefetchRemainingResponses();
+        event.trySerializeRemainingResponses();
+      } catch (final IOException ignored) {
+      }
+    }
+  }
+
   @Override
   protected boolean onEvent(final TabletInsertionEvent event) {
     final AtomicBoolean result = new AtomicBoolean(false);
@@ -337,18 +355,21 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     }
   }
 
-  @Override
-  public synchronized void executePrefetch() {
-    tryPrefetch();
+  /////////////////////////////// commit ///////////////////////////////
 
-    // prefetch remaining subscription events based on {@link 
consumerIdToSubscriptionEventMap}
-    for (final SubscriptionEvent event : 
consumerIdToSubscriptionEventMap.values()) {
-      try {
-        event.prefetchRemainingResponses();
-        event.trySerializeRemainingResponses();
-      } catch (final IOException ignored) {
+  /**
+   * @return {@code true} if nack successfully
+   */
+  @Override
+  public boolean nack(final String consumerId, final SubscriptionCommitContext 
commitContext) {
+    if (super.nack(consumerId, commitContext)) {
+      final SubscriptionEvent event = 
consumerIdToSubscriptionEventMap.get(consumerId);
+      if (Objects.nonNull(event) && Objects.equals(commitContext, 
event.getCommitContext())) {
+        consumerIdToSubscriptionEventMap.remove(consumerId);
       }
+      return true;
     }
+    return false;
   }
 
   /////////////////////////////// utility ///////////////////////////////
@@ -397,7 +418,7 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
 
       consumerIdToSubscriptionEventMap.remove(entry.getKey());
 
-      event.resetCurrentResponseIndex();
+      event.nack();
       consumerIdToSubscriptionEventMap.put(consumerId, event);
 
       event.recordLastPolledConsumerId(consumerId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
index c4cfac361c7..b8926a80316 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java
@@ -110,8 +110,8 @@ public class SubscriptionEvent {
     return responses[index];
   }
 
-  public void resetCurrentResponseIndex() {
-    currentResponseIndex = 0;
+  public SubscriptionCommitContext getCommitContext() {
+    return commitContext;
   }
 
   //////////////////////////// commit ////////////////////////////
@@ -169,6 +169,9 @@ public class SubscriptionEvent {
   }
 
   public void nack() {
+    // reset current response index
+    currentResponseIndex = 0;
+
     lastPolledConsumerId = null;
     lastPolledTimestamp = INVALID_TIMESTAMP;
   }

Reply via email to