This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cac4ce0d79c26920e96eb8fd9166128608d79802
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jul 24 11:18:26 2024 +0800

    Subscription: fix concurrent issues related to seal batch & use hybrid 
realtime extractor for tsfile format topic (#12996)
    
    > Special thanks to Chang Xue for providing the test scenarios.
    
    ### Shared consumption semantics after this PR
    
    During the process of converting a tablet to tsfile, it is possible that 
the generated tsfiles contain duplicate data. **Therefore, under normal 
consumption (no re-consumption situation), the tsfile format data CANNOT ensure 
that the total consumed data equals the written data. It can only be guaranteed 
that the total consumed data in the dataset format equals the written data**.
    
    ### Issue analysis
    
    It should be noted that the parameter `updateFunction` of the 
`getAndUpdate` method of `AtomicReference` is required to be a side-effect-free 
function, as `updateFunction` may be execute concurrently.
    
    If multiple threads call `batch.onEvent` at the same time, and if one 
thread returns true and seals the batch, the remaining threads still blocked on 
the `batch.onEvent` call may try to call `batch.onEvent` again, which could 
lead to sealing the same batch multiple times.
    
    (cherry picked from commit b540636c58aa069a61f8ed04d70581580ff5eecd)
---
 .../it/triple/AbstractSubscriptionTripleIT.java    |  78 +++
 .../it/triple/IoTDBSubscriptionSharingIT.java      | 560 +++++++++++++++++++++
 .../iotdb/rpc/subscription/config/TopicConfig.java |   5 +-
 .../batch/PipeTabletEventTsFileBatch.java          |   6 +-
 .../broker/SubscriptionPrefetchingQueue.java       |  16 +-
 .../broker/SubscriptionPrefetchingTabletQueue.java |  58 +--
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  68 +--
 .../batch/SubscriptionPipeTabletEventBatch.java    |  87 +++-
 .../batch/SubscriptionPipeTsFileEventBatch.java    |  68 ++-
 9 files changed, 810 insertions(+), 136 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
new file mode 100644
index 00000000000..d0d23a0407f
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.triple;
+
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import 
org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
+
+import org.junit.After;
+import org.junit.Before;
+
+abstract class AbstractSubscriptionTripleIT extends AbstractSubscriptionIT {
+
+  protected BaseEnv sender;
+  protected BaseEnv receiver1;
+  protected BaseEnv receiver2;
+
+  @Override
+  @Before
+  public void setUp() {
+    super.setUp();
+
+    // increase the number of threads to speed up testing
+    SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(4);
+    
SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(4);
+
+    MultiEnvFactory.createEnv(3);
+    sender = MultiEnvFactory.getEnv(0);
+    receiver1 = MultiEnvFactory.getEnv(1);
+    receiver2 = MultiEnvFactory.getEnv(2);
+
+    setUpConfig();
+
+    sender.initClusterEnvironment();
+    receiver1.initClusterEnvironment();
+    receiver2.initClusterEnvironment();
+  }
+
+  protected void setUpConfig() {
+    // enable auto create schema
+    sender.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+    receiver1.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+    receiver2.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
+
+    // 10 min, assert that the operations will not time out
+    sender.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+    receiver1.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+    receiver2.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+  }
+
+  @Override
+  @After
+  public void tearDown() {
+    super.tearDown();
+
+    sender.cleanClusterEnvironment();
+    receiver1.cleanClusterEnvironment();
+    receiver2.cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/IoTDBSubscriptionSharingIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/IoTDBSubscriptionSharingIT.java
new file mode 100644
index 00000000000..0dc6edf1548
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/IoTDBSubscriptionSharingIT.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.triple;
+
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2Subscription.class})
+public class IoTDBSubscriptionSharingIT extends AbstractSubscriptionTripleIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionSharingIT.class);
+
+  private final String topicNamePrefix = "topic_";
+  private final String databasePrefix = "root.test.g_";
+
+  private final AtomicLong rowCount00 = new AtomicLong(0);
+  private final AtomicLong rowCount10 = new AtomicLong(0);
+  private final AtomicLong rowCount70 = new AtomicLong(0);
+  private final AtomicLong rowCount90 = new AtomicLong(0);
+  private final AtomicLong rowCount6 = new AtomicLong(0);
+
+  private final String sql1 = "select count(s_0) from " + databasePrefix + 
"1.d_0";
+  private final String sql2 = "select count(s_0) from " + databasePrefix + 
"2.d_0";
+  private final String sql3 = "select count(s_0) from " + databasePrefix + 
"3.d_0";
+  private final String sql4 = "select count(s_0) from " + databasePrefix + 
"4.d_0";
+
+  private final List<MeasurementSchema> schemaList = new ArrayList<>(2);
+  private final List<SubscriptionPushConsumer> consumers = new ArrayList<>(10);
+
+  private void createTopic(
+      final String topicName,
+      final String path,
+      final String startTime,
+      final String endTime,
+      final boolean isTsFile) {
+    final String host = sender.getIP();
+    final int port = Integer.parseInt(sender.getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties properties = new Properties();
+      if (path != null) {
+        properties.setProperty(TopicConstant.PATH_KEY, path);
+      }
+      if (startTime != null) {
+        properties.setProperty(TopicConstant.START_TIME_KEY, startTime);
+      }
+      if (endTime != null) {
+        properties.setProperty(TopicConstant.END_TIME_KEY, endTime);
+      }
+      if (isTsFile) {
+        properties.setProperty(
+            TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      } else {
+        properties.setProperty(
+            TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+      }
+      session.createTopic(topicName, properties);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void insertData(long timestamp, final String device, final int rows) 
{
+    // Insert some data on sender
+    try (final ISession session = sender.getSessionConnection()) {
+      final Tablet tablet = new Tablet(device, schemaList, rows);
+      int rowIndex;
+      for (int row = 0; row < rows; row++) {
+        rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp);
+        tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, (row + 
1) * 1400 + row);
+        tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (row + 
1) * 100 + 0.5);
+        timestamp += 2000;
+      }
+      session.insertTablet(tablet);
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void createTopics() {
+    createTopic(
+        topicNamePrefix + 0,
+        databasePrefix + "0.**",
+        "2024-01-01T00:00:00+08:00",
+        "2024-03-31T23:59:59+08:00",
+        true);
+    createTopic(topicNamePrefix + 1, databasePrefix + "1.**", null, null, 
false);
+
+    createTopic(topicNamePrefix + 2, databasePrefix + "2.**", "now", null, 
false);
+    createTopic(topicNamePrefix + 3, databasePrefix + "3.**", null, "now", 
false);
+    createTopic(
+        topicNamePrefix + 4, databasePrefix + "4.**", null, 
"2024-03-31T23:59:59+08:00", false);
+
+    createTopic(topicNamePrefix + 6, databasePrefix + "6.**", "now", null, 
true);
+
+    createTopic(
+        topicNamePrefix + 5, databasePrefix + "5.**", 
"2024-01-01T00:00:00+08:00", null, false);
+    createTopic(
+        topicNamePrefix + 7, databasePrefix + "7.**", null, 
"2024-03-31T23:59:59+08:00", true);
+    createTopic(topicNamePrefix + 8, databasePrefix + "8.**", null, "now", 
true);
+    createTopic(
+        topicNamePrefix + 9, databasePrefix + "9.**", 
"2024-01-01T00:00:00+08:00", null, true);
+  }
+
+  private void insertDatum() {
+    long timestamp = 1706659200000L; // 2024-01-31 08:00:00+08:00
+
+    for (int i = 0; i < 20; i++) {
+      for (int k = 0; k < 10; k++) {
+        final String device = databasePrefix + k + ".d_0";
+        insertData(timestamp, device, 20);
+      }
+      timestamp += 40000;
+    }
+
+    for (int i = 0; i < 10; i++) {
+      final String device = databasePrefix + i + ".d_0";
+      insertData(System.currentTimeMillis(), device, 5);
+    }
+
+    final String device = databasePrefix + 2 + ".d_0";
+    for (int i = 0; i < 20; i++) {
+      insertData(System.currentTimeMillis(), device, 5);
+    }
+  }
+
+  private void preparePushConsumers() {
+    // create consumers
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_0")
+            .consumerGroupId("push_group_id_1")
+            .consumeListener(
+                message -> {
+                  try {
+                    try (final TsFileReader reader = 
message.getTsFileHandler().openReader()) {
+                      final QueryDataSet dataset =
+                          reader.query(
+                              QueryExpression.create(
+                                  Collections.singletonList(
+                                      new Path(databasePrefix + "0.d_0", 
"s_0", true)),
+                                  null));
+                      while (dataset.hasNext()) {
+                        rowCount00.addAndGet(1);
+                        dataset.next();
+                      }
+                    }
+                  } catch (final IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_1")
+            .consumerGroupId("push_group_id_1")
+            .consumeListener(
+                message -> {
+                  try {
+                    try (final TsFileReader reader = 
message.getTsFileHandler().openReader()) {
+                      final QueryDataSet dataset =
+                          reader.query(
+                              QueryExpression.create(
+                                  Collections.singletonList(
+                                      new Path(databasePrefix + "0.d_0", 
"s_0", true)),
+                                  null));
+                      while (dataset.hasNext()) {
+                        rowCount10.addAndGet(1);
+                        dataset.next();
+                      }
+                    }
+                  } catch (final IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_2")
+            .consumerGroupId("push_group_id_1")
+            .consumeListener(
+                message -> {
+                  for (final SubscriptionSessionDataSet dataSet :
+                      message.getSessionDataSetsHandler()) {
+                    try {
+                      
receiver1.getSessionConnection().insertTablet(dataSet.getTablet());
+                    } catch (final StatementExecutionException | 
IoTDBConnectionException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_3")
+            .consumerGroupId("push_group_id_1")
+            .consumeListener(
+                message -> {
+                  for (final SubscriptionSessionDataSet dataSet :
+                      message.getSessionDataSetsHandler()) {
+                    try {
+                      
receiver2.getSessionConnection().insertTablet(dataSet.getTablet());
+                    } catch (final StatementExecutionException | 
IoTDBConnectionException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_4")
+            .consumerGroupId("push_group_id_2")
+            .consumeListener(
+                message -> {
+                  for (final SubscriptionSessionDataSet dataSet :
+                      message.getSessionDataSetsHandler()) {
+                    try {
+                      
receiver1.getSessionConnection().insertTablet(dataSet.getTablet());
+                    } catch (final StatementExecutionException | 
IoTDBConnectionException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_5")
+            .consumerGroupId("push_group_id_2")
+            .consumeListener(
+                message -> {
+                  for (final SubscriptionSessionDataSet dataSet :
+                      message.getSessionDataSetsHandler()) {
+                    try {
+                      
receiver2.getSessionConnection().insertTablet(dataSet.getTablet());
+                    } catch (final StatementExecutionException | 
IoTDBConnectionException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_6")
+            .consumerGroupId("push_group_id_2")
+            .consumeListener(
+                message -> {
+                  for (final SubscriptionSessionDataSet dataSet :
+                      message.getSessionDataSetsHandler()) {
+                    try {
+                      
receiver1.getSessionConnection().insertTablet(dataSet.getTablet());
+                    } catch (final StatementExecutionException | 
IoTDBConnectionException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_7")
+            .consumerGroupId("push_group_id_3")
+            .consumeListener(
+                message -> {
+                  final short messageType = message.getMessageType();
+                  if 
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
+                    switch (SubscriptionMessageType.valueOf(messageType)) {
+                      case SESSION_DATA_SETS_HANDLER:
+                        for (final SubscriptionSessionDataSet dataSet :
+                            message.getSessionDataSetsHandler()) {
+                          try {
+                            
receiver1.getSessionConnection().insertTablet(dataSet.getTablet());
+                          } catch (final StatementExecutionException | 
IoTDBConnectionException e) {
+                            throw new RuntimeException(e);
+                          }
+                        }
+                        break;
+                      case TS_FILE_HANDLER:
+                        try (final TsFileReader reader = 
message.getTsFileHandler().openReader()) {
+                          final QueryDataSet dataset =
+                              reader.query(
+                                  QueryExpression.create(
+                                      Collections.singletonList(
+                                          new Path(databasePrefix + "0.d_0", 
"s_0", true)),
+                                      null));
+                          while (dataset.hasNext()) {
+                            rowCount70.addAndGet(1);
+                            dataset.next();
+                          }
+                        } catch (final IOException e) {
+                          throw new RuntimeException(e);
+                        }
+                        break;
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_8")
+            .consumerGroupId("push_group_id_3")
+            .consumeListener(
+                message -> {
+                  try (final TsFileReader reader = 
message.getTsFileHandler().openReader()) {
+                    final QueryDataSet dataset =
+                        reader.query(
+                            QueryExpression.create(
+                                Collections.singletonList(
+                                    new Path(databasePrefix + 6 + ".d_0", 
"s_0", true)),
+                                null));
+                    while (dataset.hasNext()) {
+                      rowCount6.addAndGet(1);
+                      dataset.next();
+                    }
+                  } catch (final IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+    consumers.add(
+        new SubscriptionPushConsumer.Builder()
+            .host(sender.getIP())
+            .port(Integer.parseInt(sender.getPort()))
+            .consumerId("consumer_id_9")
+            .consumerGroupId("push_group_id_3")
+            .consumeListener(
+                message -> {
+                  final short messageType = message.getMessageType();
+                  if 
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
+                    switch (SubscriptionMessageType.valueOf(messageType)) {
+                      case SESSION_DATA_SETS_HANDLER:
+                        for (final SubscriptionSessionDataSet dataSet :
+                            message.getSessionDataSetsHandler()) {
+                          try {
+                            
receiver2.getSessionConnection().insertTablet(dataSet.getTablet());
+                          } catch (final StatementExecutionException | 
IoTDBConnectionException e) {
+                            throw new RuntimeException(e);
+                          }
+                        }
+                        break;
+                      case TS_FILE_HANDLER:
+                        try (final TsFileReader reader = 
message.getTsFileHandler().openReader()) {
+                          final QueryDataSet dataset =
+                              reader.query(
+                                  QueryExpression.create(
+                                      Collections.singletonList(
+                                          new Path(databasePrefix + "0.d_0", 
"s_0", true)),
+                                      null));
+                          while (dataset.hasNext()) {
+                            rowCount90.addAndGet(1);
+                            dataset.next();
+                          }
+                        } catch (final IOException e) {
+                          throw new RuntimeException(e);
+                        }
+                        break;
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer());
+
+    // open consumers
+    for (final SubscriptionPushConsumer consumer : consumers) {
+      consumer.open();
+    }
+
+    // subscribe topics
+    consumers.get(0).subscribe(topicNamePrefix + 0);
+    consumers.get(1).subscribe(topicNamePrefix + 0);
+    consumers.get(2).subscribe(topicNamePrefix + 1);
+    consumers.get(3).subscribe(topicNamePrefix + 1);
+    consumers.get(4).subscribe(topicNamePrefix + 2, topicNamePrefix + 3);
+    consumers.get(5).subscribe(topicNamePrefix + 3, topicNamePrefix + 4);
+    consumers.get(6).subscribe(topicNamePrefix + 2, topicNamePrefix + 4);
+    consumers.get(7).subscribe(topicNamePrefix + 0, topicNamePrefix + 3);
+    consumers.get(8).subscribe(topicNamePrefix + 6);
+    consumers.get(9).subscribe(topicNamePrefix + 0, topicNamePrefix + 3);
+  }
+
+  @Override
+  @Before
+  public void setUp() {
+    super.setUp();
+
+    // prepare schemaList
+    schemaList.add(new MeasurementSchema("s_0", TSDataType.INT32));
+    schemaList.add(new MeasurementSchema("s_1", TSDataType.DOUBLE));
+  }
+
+  @Override
+  @After
+  public void tearDown() {
+    // log some info
+    try {
+      LOGGER.info("[src] {} = {}", sql1, getCount(sender, sql1));
+      LOGGER.info("[dest1] {} = {}", sql1, getCount(receiver1, sql1));
+      LOGGER.info("[dest2] {} = {}", sql1, getCount(receiver2, sql1));
+
+      LOGGER.info("[src] {} = {}", sql2, getCount(sender, sql2));
+      LOGGER.info("[dest1] {} = {}", sql2, getCount(receiver1, sql2));
+      LOGGER.info("[dest2] {} = {}", sql2, getCount(receiver2, sql2));
+
+      LOGGER.info("[src] {} = {}", sql3, getCount(sender, sql3));
+      LOGGER.info("[dest1] {} = {}", sql3, getCount(receiver1, sql3));
+      LOGGER.info("[dest2] {} = {}", sql3, getCount(receiver2, sql3));
+
+      LOGGER.info("[src] {} = {}", sql4, getCount(sender, sql4));
+      LOGGER.info("[dest1] {} = {}", sql4, getCount(receiver1, sql4));
+      LOGGER.info("[dest2] {} = {}", sql4, getCount(receiver2, sql4));
+    } catch (final Exception ignored) {
+    }
+
+    LOGGER.info("rowCount00 = {}", rowCount00.get());
+    LOGGER.info("rowCount10 = {}", rowCount10.get());
+    LOGGER.info("rowCount70 = {}", rowCount70.get());
+    LOGGER.info("rowCount90 = {}", rowCount90.get());
+    LOGGER.info("rowCount6 = {}", rowCount6.get());
+
+    // close consumers
+    for (final SubscriptionPushConsumer consumer : consumers) {
+      try {
+        consumer.close();
+      } catch (final Exception ignored) {
+      }
+    }
+
+    super.tearDown();
+  }
+
+  @Test
+  public void testSubscriptionSharing() {
+    createTopics();
+    preparePushConsumers();
+    insertDatum();
+
+    AWAIT.untilAsserted(
+        () -> {
+          // "c0,c1|topic0"
+          final long topic0Group1Total = rowCount00.get() + rowCount10.get();
+          // TODO: ensure that the total consumption of tsfile format data 
equals the written data
+          Assert.assertTrue(400 <= topic0Group1Total && topic0Group1Total <= 
800);
+
+          // "c2,c3|topic1"
+          Assert.assertEquals(
+              getCount(sender, sql1), getCount(receiver1, sql1) + 
getCount(receiver2, sql1));
+
+          // "c4,c6|topic2"
+          Assert.assertEquals(105, getCount(receiver1, sql2) + 
getCount(receiver2, sql2));
+
+          // "c4,c5|c7,c9|topic3"
+          final long topic3Total = getCount(receiver1, sql3) + 
getCount(receiver2, sql3);
+          Assert.assertTrue(400 <= topic3Total && topic3Total <= 800);
+
+          // "c5,c6|topic4"
+          Assert.assertEquals(400, getCount(receiver1, sql4) + 
getCount(receiver2, sql4));
+
+          // "c7,c9|topic0"
+          final long topic0Group3Total = rowCount70.get() + rowCount90.get();
+          // TODO: ensure that the total consumption of tsfile format data 
equals the written data
+          Assert.assertTrue(400 <= topic0Group3Total && topic0Group3Total <= 
800);
+
+          // "c8|topic6"
+          Assert.assertEquals(5, rowCount6.get());
+        });
+  }
+
+  private static long getCount(final BaseEnv env, final String sql)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try (final ISession session = env.getSessionConnection()) {
+      final SessionDataSet dataSet = session.executeQueryStatement(sql);
+      return dataSet.hasNext() ? dataSet.next().getFields().get(0).getLongV() 
: 0;
+    }
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index 2509213446e..178c5a652f5 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -113,10 +113,7 @@ public class TopicConfig extends PipeParameters {
   }
 
   public Map<String, String> getAttributesWithRealtimeMode() {
-    return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(
-            attributes.getOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE))
-        ? REALTIME_BATCH_MODE_CONFIG
-        : REALTIME_STREAM_MODE_CONFIG;
+    return REALTIME_STREAM_MODE_CONFIG;
   }
 
   public Map<String, String> getAttributesWithSourceMode() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index b27b0b467b3..57a73935304 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -216,7 +216,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     }
 
     // Sort the devices by device id
-    List<String> devices = new ArrayList<>(device2Tablets.keySet());
+    final List<String> devices = new ArrayList<>(device2Tablets.keySet());
     devices.sort(Comparator.naturalOrder());
 
     // Replace ArrayList with LinkedList to improve performance
@@ -311,8 +311,8 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   }
 
   private void tryBestToWriteTabletsIntoOneFile(
-      LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList,
-      Map<String, Boolean> device2Aligned)
+      final LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList,
+      final Map<String, Boolean> device2Aligned)
       throws IOException, WriteProcessException {
     final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
         device2TabletsLinkedList.entrySet().iterator();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index f88368d01c8..909d3dc53a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -147,11 +147,11 @@ public abstract class SubscriptionPrefetchingQueue {
    * <p>It will continuously attempt to prefetch and generate a {@link 
SubscriptionEvent} until
    * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
    *
-   * @param trySealBatchIfEmpty {@code true} if {@link 
SubscriptionPrefetchingQueue#trySealBatch} is
-   *     called when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is 
empty, {@code false}
+   * @param onEventIfEmpty {@code true} if {@link 
SubscriptionPrefetchingQueue#onEvent()} is called
+   *     when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty, 
{@code false}
    *     otherwise
    */
-  protected void tryPrefetch(final boolean trySealBatchIfEmpty) {
+  protected void tryPrefetch(final boolean onEventIfEmpty) {
     while (!inputPendingQueue.isEmpty()) {
       final Event event = 
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
       if (Objects.isNull(event)) {
@@ -202,14 +202,14 @@ public abstract class SubscriptionPrefetchingQueue {
           "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent 
{} when prefetching.",
           this,
           event);
-      if (trySealBatch()) {
+      if (onEvent()) {
         return;
       }
     }
 
     // At this moment, the inputPendingQueue is empty.
-    if (trySealBatchIfEmpty) {
-      trySealBatch();
+    if (onEventIfEmpty) {
+      onEvent();
     }
   }
 
@@ -226,7 +226,7 @@ public abstract class SubscriptionPrefetchingQueue {
   /**
    * @return {@code true} if a new event has been prefetched.
    */
-  protected abstract boolean trySealBatch();
+  protected abstract boolean onEvent();
 
   /////////////////////////////// commit ///////////////////////////////
 
@@ -310,7 +310,7 @@ public abstract class SubscriptionPrefetchingQueue {
     return true;
   }
 
-  protected SubscriptionCommitContext generateSubscriptionCommitContext() {
+  public SubscriptionCommitContext generateSubscriptionCommitContext() {
     // Recording data node ID and reboot times to address potential stale 
commit IDs caused by
     // leader transfers or restarts.
     return new SubscriptionCommitContext(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index b0f69da2e76..ddd38f50a59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -24,14 +24,9 @@ import 
org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch;
-import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
-import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
 
-import org.apache.tsfile.write.record.Tablet;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue {
+public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQueue {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionPrefetchingTabletQueue.class);
@@ -62,7 +57,7 @@ class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQueue {
     super(brokerId, topicName, inputPendingQueue);
 
     this.currentBatchRef.set(
-        new SubscriptionPipeTabletEventBatch(BATCH_MAX_DELAY_IN_MS, 
BATCH_MAX_SIZE_IN_BYTES));
+        new SubscriptionPipeTabletEventBatch(this, BATCH_MAX_DELAY_IN_MS, 
BATCH_MAX_SIZE_IN_BYTES));
   }
 
   @Override
@@ -97,51 +92,34 @@ class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQueue {
     return onEventInternal(event);
   }
 
-  private boolean onEventInternal(final EnrichedEvent event) {
-    final AtomicBoolean result = new AtomicBoolean(false);
-    currentBatchRef.getAndUpdate(
-        (batch) -> {
-          if (batch.onEvent(event)) {
-            sealBatch(batch);
-            result.set(true);
-            return new SubscriptionPipeTabletEventBatch(
-                BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
-          }
-          return batch;
-        });
-    return result.get();
+  @Override
+  protected boolean onEvent() {
+    return onEventInternal(null);
   }
 
-  @Override
-  protected boolean trySealBatch() {
+  private boolean onEventInternal(@Nullable final EnrichedEvent event) {
     final AtomicBoolean result = new AtomicBoolean(false);
     currentBatchRef.getAndUpdate(
         (batch) -> {
-          if (batch.shouldEmit()) {
-            sealBatch(batch);
+          final List<SubscriptionEvent> evs = batch.onEvent(event);
+          if (!evs.isEmpty()) {
+            evs.forEach(
+                (ev) -> {
+                  uncommittedEvents.put(ev.getCommitContext(), ev); // before 
enqueuing the event
+                  prefetchingQueue.add(ev);
+                });
             result.set(true);
             return new SubscriptionPipeTabletEventBatch(
-                BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
+                this, BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
           }
+          // If onEvent returns an empty list, one possibility is that the 
batch has already been
+          // sealed, which would result in the failure of 
weakCompareAndSetVolatile to obtain the
+          // most recent batch.
           return batch;
         });
     return result.get();
   }
 
-  private void sealBatch(final SubscriptionPipeTabletEventBatch batch) {
-    final List<Tablet> tablets = batch.sealTablets();
-    final SubscriptionCommitContext commitContext = 
generateSubscriptionCommitContext();
-    final SubscriptionEvent subscriptionEvent =
-        new SubscriptionEvent(
-            new SubscriptionPipeTabletBatchEvents(batch),
-            new SubscriptionPollResponse(
-                SubscriptionPollResponseType.TABLETS.getType(),
-                new TabletsPayload(tablets),
-                commitContext));
-    uncommittedEvents.put(commitContext, subscriptionEvent); // before 
enqueuing the event
-    prefetchingQueue.add(subscriptionEvent);
-  }
-
   /**
    * serialize uncommitted and pollable events in {@link
    * SubscriptionPrefetchingQueue#prefetchingQueue}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index a461553b87a..2b70ea7d56a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import 
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
-import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
 import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFilePlainEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
@@ -35,6 +34,7 @@ import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseTy
 
 import com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,10 +46,9 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue {
+public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionPrefetchingTsFileQueue.class);
@@ -72,7 +71,7 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
 
     this.consumerIdToSubscriptionEventMap = new ConcurrentHashMap<>();
     this.currentBatchRef.set(
-        new SubscriptionPipeTsFileEventBatch(BATCH_MAX_DELAY_IN_MS, 
BATCH_MAX_SIZE_IN_BYTES));
+        new SubscriptionPipeTsFileEventBatch(this, BATCH_MAX_DELAY_IN_MS, 
BATCH_MAX_SIZE_IN_BYTES));
   }
 
   @Override
@@ -305,26 +304,7 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
 
   @Override
   protected boolean onEvent(final TabletInsertionEvent event) {
-    final AtomicBoolean result = new AtomicBoolean(false);
-    currentBatchRef.getAndUpdate(
-        (batch) -> {
-          try {
-            if (batch.onEvent(event)) {
-              sealBatch(batch);
-              result.set(true);
-              return new SubscriptionPipeTsFileEventBatch(
-                  BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
-            }
-            return batch;
-          } catch (final Exception e) {
-            LOGGER.warn(
-                "Exception occurred when SubscriptionPrefetchingTsFileQueue {} 
sealing tsFiles from batch",
-                this,
-                e);
-            return batch;
-          }
-        });
-    return result.get();
+    return onEventInternal(event);
   }
 
   @Override
@@ -343,22 +323,35 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
   }
 
   @Override
-  protected boolean trySealBatch() {
+  protected boolean onEvent() {
+    return onEventInternal(null);
+  }
+
+  private boolean onEventInternal(@Nullable final TabletInsertionEvent event) {
     final AtomicBoolean result = new AtomicBoolean(false);
     currentBatchRef.getAndUpdate(
         (batch) -> {
           try {
-            if (batch.shouldEmit()) {
-              sealBatch(batch);
+            final List<SubscriptionEvent> evs = batch.onEvent(event);
+            if (!evs.isEmpty()) {
+              evs.forEach(
+                  (ev) -> {
+                    uncommittedEvents.put(ev.getCommitContext(), ev); // 
before enqueuing the event
+                    prefetchingQueue.add(ev);
+                  });
               result.set(true);
               return new SubscriptionPipeTsFileEventBatch(
-                  BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
+                  this, BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
             }
+            // If onEvent returns an empty list, one possibility is that the 
batch has already been
+            // sealed, which would result in the failure of 
weakCompareAndSetVolatile to obtain the
+            // most recent batch.
             return batch;
           } catch (final Exception e) {
             LOGGER.warn(
-                "Exception occurred when SubscriptionPrefetchingTsFileQueue {} 
sealing TsFile from batch",
+                "Exception occurred when SubscriptionPrefetchingTsFileQueue {} 
sealing TsFiles from batch {}",
                 this,
+                batch,
                 e);
             return batch;
           }
@@ -366,23 +359,6 @@ class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQueue {
     return result.get();
   }
 
-  private void sealBatch(final SubscriptionPipeTsFileEventBatch batch) throws 
Exception {
-    final List<File> tsFiles = batch.sealTsFiles();
-    final AtomicInteger referenceCount = new AtomicInteger(tsFiles.size());
-    for (final File tsFile : tsFiles) {
-      final SubscriptionCommitContext commitContext = 
generateSubscriptionCommitContext();
-      final SubscriptionEvent subscriptionEvent =
-          new SubscriptionEvent(
-              new SubscriptionPipeTsFileBatchEvents(batch, tsFile, 
referenceCount),
-              new SubscriptionPollResponse(
-                  SubscriptionPollResponseType.FILE_INIT.getType(),
-                  new FileInitPayload(tsFile.getName()),
-                  commitContext));
-      uncommittedEvents.put(commitContext, subscriptionEvent); // before 
enqueuing the event
-      prefetchingQueue.add(subscriptionEvent);
-    }
-  }
-
   /////////////////////////////// commit ///////////////////////////////
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
index 892bcaabb68..b0ef2b85116 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java
@@ -24,9 +24,17 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
 
 import org.apache.tsfile.write.record.Tablet;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +49,8 @@ public class SubscriptionPipeTabletEventBatch {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class);
 
+  private final SubscriptionPrefetchingTabletQueue prefetchingQueue;
+
   private final List<EnrichedEvent> enrichedEvents = new ArrayList<>();
   private final List<Tablet> tablets = new ArrayList<>();
 
@@ -50,30 +60,67 @@ public class SubscriptionPipeTabletEventBatch {
   private final long maxBatchSizeInBytes;
   private long totalBufferSize = 0;
 
-  public SubscriptionPipeTabletEventBatch(final int maxDelayInMs, final long 
maxBatchSizeInBytes) {
+  private boolean isSealed = false;
+
+  public SubscriptionPipeTabletEventBatch(
+      final SubscriptionPrefetchingTabletQueue prefetchingQueue,
+      final int maxDelayInMs,
+      final long maxBatchSizeInBytes) {
+    this.prefetchingQueue = prefetchingQueue;
     this.maxDelayInMs = maxDelayInMs;
     this.maxBatchSizeInBytes = maxBatchSizeInBytes;
   }
 
-  public synchronized List<Tablet> sealTablets() {
-    return tablets;
+  public synchronized List<SubscriptionEvent> onEvent(@Nullable final 
EnrichedEvent event) {
+    if (isSealed) {
+      return Collections.emptyList();
+    }
+    if (Objects.nonNull(event)) {
+      constructBatch(event);
+    }
+    if (shouldEmit()) {
+      final List<SubscriptionEvent> events = generateSubscriptionEvents();
+      isSealed = true;
+      return events;
+    }
+    return Collections.emptyList();
   }
 
-  public synchronized boolean shouldEmit() {
-    return totalBufferSize >= maxBatchSizeInBytes
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+  public synchronized void ack() {
+    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+      enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
+    }
+  }
+
+  public synchronized void cleanup() {
+    // clear the reference count of events
+    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
+      enrichedEvent.clearReferenceCount(this.getClass().getName());
+    }
+  }
+
+  private List<SubscriptionEvent> generateSubscriptionEvents() {
+    final SubscriptionCommitContext commitContext =
+        prefetchingQueue.generateSubscriptionCommitContext();
+    return Collections.singletonList(
+        new SubscriptionEvent(
+            new SubscriptionPipeTabletBatchEvents(this),
+            new SubscriptionPollResponse(
+                SubscriptionPollResponseType.TABLETS.getType(),
+                new TabletsPayload(tablets),
+                commitContext)));
   }
 
-  public synchronized boolean onEvent(final EnrichedEvent event) {
+  private void constructBatch(final EnrichedEvent event) {
     if (event instanceof TabletInsertionEvent) {
       final List<Tablet> currentTablets = 
convertToTablets((TabletInsertionEvent) event);
       if (currentTablets.isEmpty()) {
-        return shouldEmit();
+        return;
       }
       tablets.addAll(currentTablets);
       totalBufferSize +=
           currentTablets.stream()
-              .map((PipeMemoryWeightUtil::calculateTabletSizeInBytes))
+              .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
               .reduce(Long::sum)
               .orElse(0L);
       enrichedEvents.add(event);
@@ -90,7 +137,7 @@ public class SubscriptionPipeTabletEventBatch {
         tablets.addAll(currentTablets);
         totalBufferSize +=
             currentTablets.stream()
-                .map((PipeMemoryWeightUtil::calculateTabletSizeInBytes))
+                .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
                 .reduce(Long::sum)
                 .orElse(0L);
       }
@@ -99,21 +146,11 @@ public class SubscriptionPipeTabletEventBatch {
         firstEventProcessingTime = System.currentTimeMillis();
       }
     }
-
-    return shouldEmit();
-  }
-
-  public synchronized void ack() {
-    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
-      enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
-    }
   }
 
-  public synchronized void cleanup() {
-    // clear the reference count of events
-    for (final EnrichedEvent enrichedEvent : enrichedEvents) {
-      enrichedEvent.clearReferenceCount(this.getClass().getName());
-    }
+  private boolean shouldEmit() {
+    return totalBufferSize >= maxBatchSizeInBytes
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
   }
 
   private List<Tablet> convertToTablets(final TabletInsertionEvent 
tabletInsertionEvent) {
@@ -131,6 +168,8 @@ public class SubscriptionPipeTabletEventBatch {
     return Collections.emptyList();
   }
 
+  /////////////////////////////// stringify ///////////////////////////////
+
   public String toString() {
     return "SubscriptionPipeTabletEventBatch{enrichedEvents="
         + 
enrichedEvents.stream().map(EnrichedEvent::coreReportMessage).collect(Collectors.toList())
@@ -144,6 +183,8 @@ public class SubscriptionPipeTabletEventBatch {
         + maxBatchSizeInBytes
         + ", totalBufferSize="
         + totalBufferSize
+        + ", isSealed="
+        + isSealed
         + "}";
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
index 93f6a9464b4..c36f9ca3964 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java
@@ -20,30 +20,54 @@
 package org.apache.iotdb.db.subscription.event.batch;
 
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import 
org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import 
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class SubscriptionPipeTsFileEventBatch {
 
+  private final SubscriptionPrefetchingTsFileQueue prefetchingQueue;
+
   private final PipeTabletEventTsFileBatch batch;
 
+  private boolean isSealed = false;
+
   public SubscriptionPipeTsFileEventBatch(
-      final int maxDelayInMs, final long requestMaxBatchSizeInBytes) {
+      final SubscriptionPrefetchingTsFileQueue prefetchingQueue,
+      final int maxDelayInMs,
+      final long requestMaxBatchSizeInBytes) {
+    this.prefetchingQueue = prefetchingQueue;
     this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs, 
requestMaxBatchSizeInBytes);
   }
 
-  public synchronized List<File> sealTsFiles() throws Exception {
-    return batch.sealTsFiles();
-  }
-
-  public synchronized boolean shouldEmit() {
-    return batch.shouldEmit();
-  }
-
-  public synchronized boolean onEvent(final TabletInsertionEvent event) throws 
Exception {
-    return batch.onEvent(event);
+  public synchronized List<SubscriptionEvent> onEvent(@Nullable final 
TabletInsertionEvent event)
+      throws Exception {
+    if (isSealed) {
+      return Collections.emptyList();
+    }
+    if (Objects.nonNull(event)) {
+      batch.onEvent(event);
+    }
+    if (batch.shouldEmit()) {
+      final List<SubscriptionEvent> events = generateSubscriptionEvents();
+      isSealed = true;
+      return events;
+    }
+    return Collections.emptyList();
   }
 
   public synchronized void ack() {
@@ -55,7 +79,27 @@ public class SubscriptionPipeTsFileEventBatch {
     batch.close();
   }
 
+  private List<SubscriptionEvent> generateSubscriptionEvents() throws 
Exception {
+    final List<SubscriptionEvent> events = new ArrayList<>();
+    final List<File> tsFiles = batch.sealTsFiles();
+    final AtomicInteger referenceCount = new AtomicInteger(tsFiles.size());
+    for (final File tsFile : tsFiles) {
+      final SubscriptionCommitContext commitContext =
+          prefetchingQueue.generateSubscriptionCommitContext();
+      events.add(
+          new SubscriptionEvent(
+              new SubscriptionPipeTsFileBatchEvents(this, tsFile, 
referenceCount),
+              new SubscriptionPollResponse(
+                  SubscriptionPollResponseType.FILE_INIT.getType(),
+                  new FileInitPayload(tsFile.getName()),
+                  commitContext)));
+    }
+    return events;
+  }
+
+  /////////////////////////////// stringify ///////////////////////////////
+
   public String toString() {
-    return "SubscriptionPipeTsFileEventBatch{batch=" + batch + "}";
+    return "SubscriptionPipeTsFileEventBatch{batch=" + batch + ", isSealed=" + 
isSealed + "}";
   }
 }

Reply via email to