This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cac4ce0d79c26920e96eb8fd9166128608d79802 Author: V_Galaxy <[email protected]> AuthorDate: Wed Jul 24 11:18:26 2024 +0800 Subscription: fix concurrent issues related to seal batch & use hybrid realtime extractor for tsfile format topic (#12996) > Special thanks to Chang Xue for providing the test scenarios. ### Shared consumption semantics after this PR During the process of converting a tablet to tsfile, it is possible that the generated tsfiles contain duplicate data. **Therefore, under normal consumption (no re-consumption situation), the tsfile format data CANNOT ensure that the total consumed data equals the written data. It can only be guaranteed that the total consumed data in the dataset format equals the written data**. ### Issue analysis It should be noted that the parameter `updateFunction` of the `getAndUpdate` method of `AtomicReference` is required to be a side-effect-free function, as `updateFunction` may be execute concurrently. If multiple threads call `batch.onEvent` at the same time, and if one thread returns true and seals the batch, the remaining threads still blocked on the `batch.onEvent` call may try to call `batch.onEvent` again, which could lead to sealing the same batch multiple times. (cherry picked from commit b540636c58aa069a61f8ed04d70581580ff5eecd) --- .../it/triple/AbstractSubscriptionTripleIT.java | 78 +++ .../it/triple/IoTDBSubscriptionSharingIT.java | 560 +++++++++++++++++++++ .../iotdb/rpc/subscription/config/TopicConfig.java | 5 +- .../batch/PipeTabletEventTsFileBatch.java | 6 +- .../broker/SubscriptionPrefetchingQueue.java | 16 +- .../broker/SubscriptionPrefetchingTabletQueue.java | 58 +-- .../broker/SubscriptionPrefetchingTsFileQueue.java | 68 +-- .../batch/SubscriptionPipeTabletEventBatch.java | 87 +++- .../batch/SubscriptionPipeTsFileEventBatch.java | 68 ++- 9 files changed, 810 insertions(+), 136 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java new file mode 100644 index 00000000000..d0d23a0407f --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.triple; + +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.session.subscription.consumer.SubscriptionExecutorServiceManager; +import org.apache.iotdb.subscription.it.AbstractSubscriptionIT; + +import org.junit.After; +import org.junit.Before; + +abstract class AbstractSubscriptionTripleIT extends AbstractSubscriptionIT { + + protected BaseEnv sender; + protected BaseEnv receiver1; + protected BaseEnv receiver2; + + @Override + @Before + public void setUp() { + super.setUp(); + + // increase the number of threads to speed up testing + SubscriptionExecutorServiceManager.setControlFlowExecutorCorePoolSize(4); + SubscriptionExecutorServiceManager.setDownstreamDataFlowExecutorCorePoolSize(4); + + MultiEnvFactory.createEnv(3); + sender = MultiEnvFactory.getEnv(0); + receiver1 = MultiEnvFactory.getEnv(1); + receiver2 = MultiEnvFactory.getEnv(2); + + setUpConfig(); + + sender.initClusterEnvironment(); + receiver1.initClusterEnvironment(); + receiver2.initClusterEnvironment(); + } + + protected void setUpConfig() { + // enable auto create schema + sender.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + receiver1.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + receiver2.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); + + // 10 min, assert that the operations will not time out + sender.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); + receiver1.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); + receiver2.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); + } + + @Override + @After + public void tearDown() { + super.tearDown(); + + sender.cleanClusterEnvironment(); + receiver1.cleanClusterEnvironment(); + receiver2.cleanClusterEnvironment(); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/IoTDBSubscriptionSharingIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/IoTDBSubscriptionSharingIT.java new file mode 100644 index 00000000000..0dc6edf1548 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/IoTDBSubscriptionSharingIT.java @@ -0,0 +1,560 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.triple; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.SubscriptionSession; +import org.apache.iotdb.session.subscription.consumer.ConsumeResult; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType; +import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.TsFileReader; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.expression.QueryExpression; +import org.apache.tsfile.read.query.dataset.QueryDataSet; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2Subscription.class}) +public class IoTDBSubscriptionSharingIT extends AbstractSubscriptionTripleIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSubscriptionSharingIT.class); + + private final String topicNamePrefix = "topic_"; + private final String databasePrefix = "root.test.g_"; + + private final AtomicLong rowCount00 = new AtomicLong(0); + private final AtomicLong rowCount10 = new AtomicLong(0); + private final AtomicLong rowCount70 = new AtomicLong(0); + private final AtomicLong rowCount90 = new AtomicLong(0); + private final AtomicLong rowCount6 = new AtomicLong(0); + + private final String sql1 = "select count(s_0) from " + databasePrefix + "1.d_0"; + private final String sql2 = "select count(s_0) from " + databasePrefix + "2.d_0"; + private final String sql3 = "select count(s_0) from " + databasePrefix + "3.d_0"; + private final String sql4 = "select count(s_0) from " + databasePrefix + "4.d_0"; + + private final List<MeasurementSchema> schemaList = new ArrayList<>(2); + private final List<SubscriptionPushConsumer> consumers = new ArrayList<>(10); + + private void createTopic( + final String topicName, + final String path, + final String startTime, + final String endTime, + final boolean isTsFile) { + final String host = sender.getIP(); + final int port = Integer.parseInt(sender.getPort()); + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties properties = new Properties(); + if (path != null) { + properties.setProperty(TopicConstant.PATH_KEY, path); + } + if (startTime != null) { + properties.setProperty(TopicConstant.START_TIME_KEY, startTime); + } + if (endTime != null) { + properties.setProperty(TopicConstant.END_TIME_KEY, endTime); + } + if (isTsFile) { + properties.setProperty( + TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE); + } else { + properties.setProperty( + TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE); + } + session.createTopic(topicName, properties); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void insertData(long timestamp, final String device, final int rows) { + // Insert some data on sender + try (final ISession session = sender.getSessionConnection()) { + final Tablet tablet = new Tablet(device, schemaList, rows); + int rowIndex; + for (int row = 0; row < rows; row++) { + rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, (row + 1) * 1400 + row); + tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (row + 1) * 100 + 0.5); + timestamp += 2000; + } + session.insertTablet(tablet); + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void createTopics() { + createTopic( + topicNamePrefix + 0, + databasePrefix + "0.**", + "2024-01-01T00:00:00+08:00", + "2024-03-31T23:59:59+08:00", + true); + createTopic(topicNamePrefix + 1, databasePrefix + "1.**", null, null, false); + + createTopic(topicNamePrefix + 2, databasePrefix + "2.**", "now", null, false); + createTopic(topicNamePrefix + 3, databasePrefix + "3.**", null, "now", false); + createTopic( + topicNamePrefix + 4, databasePrefix + "4.**", null, "2024-03-31T23:59:59+08:00", false); + + createTopic(topicNamePrefix + 6, databasePrefix + "6.**", "now", null, true); + + createTopic( + topicNamePrefix + 5, databasePrefix + "5.**", "2024-01-01T00:00:00+08:00", null, false); + createTopic( + topicNamePrefix + 7, databasePrefix + "7.**", null, "2024-03-31T23:59:59+08:00", true); + createTopic(topicNamePrefix + 8, databasePrefix + "8.**", null, "now", true); + createTopic( + topicNamePrefix + 9, databasePrefix + "9.**", "2024-01-01T00:00:00+08:00", null, true); + } + + private void insertDatum() { + long timestamp = 1706659200000L; // 2024-01-31 08:00:00+08:00 + + for (int i = 0; i < 20; i++) { + for (int k = 0; k < 10; k++) { + final String device = databasePrefix + k + ".d_0"; + insertData(timestamp, device, 20); + } + timestamp += 40000; + } + + for (int i = 0; i < 10; i++) { + final String device = databasePrefix + i + ".d_0"; + insertData(System.currentTimeMillis(), device, 5); + } + + final String device = databasePrefix + 2 + ".d_0"; + for (int i = 0; i < 20; i++) { + insertData(System.currentTimeMillis(), device, 5); + } + } + + private void preparePushConsumers() { + // create consumers + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_0") + .consumerGroupId("push_group_id_1") + .consumeListener( + message -> { + try { + try (final TsFileReader reader = message.getTsFileHandler().openReader()) { + final QueryDataSet dataset = + reader.query( + QueryExpression.create( + Collections.singletonList( + new Path(databasePrefix + "0.d_0", "s_0", true)), + null)); + while (dataset.hasNext()) { + rowCount00.addAndGet(1); + dataset.next(); + } + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_1") + .consumerGroupId("push_group_id_1") + .consumeListener( + message -> { + try { + try (final TsFileReader reader = message.getTsFileHandler().openReader()) { + final QueryDataSet dataset = + reader.query( + QueryExpression.create( + Collections.singletonList( + new Path(databasePrefix + "0.d_0", "s_0", true)), + null)); + while (dataset.hasNext()) { + rowCount10.addAndGet(1); + dataset.next(); + } + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_2") + .consumerGroupId("push_group_id_1") + .consumeListener( + message -> { + for (final SubscriptionSessionDataSet dataSet : + message.getSessionDataSetsHandler()) { + try { + receiver1.getSessionConnection().insertTablet(dataSet.getTablet()); + } catch (final StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_3") + .consumerGroupId("push_group_id_1") + .consumeListener( + message -> { + for (final SubscriptionSessionDataSet dataSet : + message.getSessionDataSetsHandler()) { + try { + receiver2.getSessionConnection().insertTablet(dataSet.getTablet()); + } catch (final StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_4") + .consumerGroupId("push_group_id_2") + .consumeListener( + message -> { + for (final SubscriptionSessionDataSet dataSet : + message.getSessionDataSetsHandler()) { + try { + receiver1.getSessionConnection().insertTablet(dataSet.getTablet()); + } catch (final StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_5") + .consumerGroupId("push_group_id_2") + .consumeListener( + message -> { + for (final SubscriptionSessionDataSet dataSet : + message.getSessionDataSetsHandler()) { + try { + receiver2.getSessionConnection().insertTablet(dataSet.getTablet()); + } catch (final StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_6") + .consumerGroupId("push_group_id_2") + .consumeListener( + message -> { + for (final SubscriptionSessionDataSet dataSet : + message.getSessionDataSetsHandler()) { + try { + receiver1.getSessionConnection().insertTablet(dataSet.getTablet()); + } catch (final StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_7") + .consumerGroupId("push_group_id_3") + .consumeListener( + message -> { + final short messageType = message.getMessageType(); + if (SubscriptionMessageType.isValidatedMessageType(messageType)) { + switch (SubscriptionMessageType.valueOf(messageType)) { + case SESSION_DATA_SETS_HANDLER: + for (final SubscriptionSessionDataSet dataSet : + message.getSessionDataSetsHandler()) { + try { + receiver1.getSessionConnection().insertTablet(dataSet.getTablet()); + } catch (final StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + break; + case TS_FILE_HANDLER: + try (final TsFileReader reader = message.getTsFileHandler().openReader()) { + final QueryDataSet dataset = + reader.query( + QueryExpression.create( + Collections.singletonList( + new Path(databasePrefix + "0.d_0", "s_0", true)), + null)); + while (dataset.hasNext()) { + rowCount70.addAndGet(1); + dataset.next(); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + break; + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_8") + .consumerGroupId("push_group_id_3") + .consumeListener( + message -> { + try (final TsFileReader reader = message.getTsFileHandler().openReader()) { + final QueryDataSet dataset = + reader.query( + QueryExpression.create( + Collections.singletonList( + new Path(databasePrefix + 6 + ".d_0", "s_0", true)), + null)); + while (dataset.hasNext()) { + rowCount6.addAndGet(1); + dataset.next(); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + consumers.add( + new SubscriptionPushConsumer.Builder() + .host(sender.getIP()) + .port(Integer.parseInt(sender.getPort())) + .consumerId("consumer_id_9") + .consumerGroupId("push_group_id_3") + .consumeListener( + message -> { + final short messageType = message.getMessageType(); + if (SubscriptionMessageType.isValidatedMessageType(messageType)) { + switch (SubscriptionMessageType.valueOf(messageType)) { + case SESSION_DATA_SETS_HANDLER: + for (final SubscriptionSessionDataSet dataSet : + message.getSessionDataSetsHandler()) { + try { + receiver2.getSessionConnection().insertTablet(dataSet.getTablet()); + } catch (final StatementExecutionException | IoTDBConnectionException e) { + throw new RuntimeException(e); + } + } + break; + case TS_FILE_HANDLER: + try (final TsFileReader reader = message.getTsFileHandler().openReader()) { + final QueryDataSet dataset = + reader.query( + QueryExpression.create( + Collections.singletonList( + new Path(databasePrefix + "0.d_0", "s_0", true)), + null)); + while (dataset.hasNext()) { + rowCount90.addAndGet(1); + dataset.next(); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + break; + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()); + + // open consumers + for (final SubscriptionPushConsumer consumer : consumers) { + consumer.open(); + } + + // subscribe topics + consumers.get(0).subscribe(topicNamePrefix + 0); + consumers.get(1).subscribe(topicNamePrefix + 0); + consumers.get(2).subscribe(topicNamePrefix + 1); + consumers.get(3).subscribe(topicNamePrefix + 1); + consumers.get(4).subscribe(topicNamePrefix + 2, topicNamePrefix + 3); + consumers.get(5).subscribe(topicNamePrefix + 3, topicNamePrefix + 4); + consumers.get(6).subscribe(topicNamePrefix + 2, topicNamePrefix + 4); + consumers.get(7).subscribe(topicNamePrefix + 0, topicNamePrefix + 3); + consumers.get(8).subscribe(topicNamePrefix + 6); + consumers.get(9).subscribe(topicNamePrefix + 0, topicNamePrefix + 3); + } + + @Override + @Before + public void setUp() { + super.setUp(); + + // prepare schemaList + schemaList.add(new MeasurementSchema("s_0", TSDataType.INT32)); + schemaList.add(new MeasurementSchema("s_1", TSDataType.DOUBLE)); + } + + @Override + @After + public void tearDown() { + // log some info + try { + LOGGER.info("[src] {} = {}", sql1, getCount(sender, sql1)); + LOGGER.info("[dest1] {} = {}", sql1, getCount(receiver1, sql1)); + LOGGER.info("[dest2] {} = {}", sql1, getCount(receiver2, sql1)); + + LOGGER.info("[src] {} = {}", sql2, getCount(sender, sql2)); + LOGGER.info("[dest1] {} = {}", sql2, getCount(receiver1, sql2)); + LOGGER.info("[dest2] {} = {}", sql2, getCount(receiver2, sql2)); + + LOGGER.info("[src] {} = {}", sql3, getCount(sender, sql3)); + LOGGER.info("[dest1] {} = {}", sql3, getCount(receiver1, sql3)); + LOGGER.info("[dest2] {} = {}", sql3, getCount(receiver2, sql3)); + + LOGGER.info("[src] {} = {}", sql4, getCount(sender, sql4)); + LOGGER.info("[dest1] {} = {}", sql4, getCount(receiver1, sql4)); + LOGGER.info("[dest2] {} = {}", sql4, getCount(receiver2, sql4)); + } catch (final Exception ignored) { + } + + LOGGER.info("rowCount00 = {}", rowCount00.get()); + LOGGER.info("rowCount10 = {}", rowCount10.get()); + LOGGER.info("rowCount70 = {}", rowCount70.get()); + LOGGER.info("rowCount90 = {}", rowCount90.get()); + LOGGER.info("rowCount6 = {}", rowCount6.get()); + + // close consumers + for (final SubscriptionPushConsumer consumer : consumers) { + try { + consumer.close(); + } catch (final Exception ignored) { + } + } + + super.tearDown(); + } + + @Test + public void testSubscriptionSharing() { + createTopics(); + preparePushConsumers(); + insertDatum(); + + AWAIT.untilAsserted( + () -> { + // "c0,c1|topic0" + final long topic0Group1Total = rowCount00.get() + rowCount10.get(); + // TODO: ensure that the total consumption of tsfile format data equals the written data + Assert.assertTrue(400 <= topic0Group1Total && topic0Group1Total <= 800); + + // "c2,c3|topic1" + Assert.assertEquals( + getCount(sender, sql1), getCount(receiver1, sql1) + getCount(receiver2, sql1)); + + // "c4,c6|topic2" + Assert.assertEquals(105, getCount(receiver1, sql2) + getCount(receiver2, sql2)); + + // "c4,c5|c7,c9|topic3" + final long topic3Total = getCount(receiver1, sql3) + getCount(receiver2, sql3); + Assert.assertTrue(400 <= topic3Total && topic3Total <= 800); + + // "c5,c6|topic4" + Assert.assertEquals(400, getCount(receiver1, sql4) + getCount(receiver2, sql4)); + + // "c7,c9|topic0" + final long topic0Group3Total = rowCount70.get() + rowCount90.get(); + // TODO: ensure that the total consumption of tsfile format data equals the written data + Assert.assertTrue(400 <= topic0Group3Total && topic0Group3Total <= 800); + + // "c8|topic6" + Assert.assertEquals(5, rowCount6.get()); + }); + } + + private static long getCount(final BaseEnv env, final String sql) + throws IoTDBConnectionException, StatementExecutionException { + try (final ISession session = env.getSessionConnection()) { + final SessionDataSet dataSet = session.executeQueryStatement(sql); + return dataSet.hasNext() ? dataSet.next().getFields().get(0).getLongV() : 0; + } + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java index 2509213446e..178c5a652f5 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java @@ -113,10 +113,7 @@ public class TopicConfig extends PipeParameters { } public Map<String, String> getAttributesWithRealtimeMode() { - return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase( - attributes.getOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE)) - ? REALTIME_BATCH_MODE_CONFIG - : REALTIME_STREAM_MODE_CONFIG; + return REALTIME_STREAM_MODE_CONFIG; } public Map<String, String> getAttributesWithSourceMode() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index b27b0b467b3..57a73935304 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -216,7 +216,7 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { } // Sort the devices by device id - List<String> devices = new ArrayList<>(device2Tablets.keySet()); + final List<String> devices = new ArrayList<>(device2Tablets.keySet()); devices.sort(Comparator.naturalOrder()); // Replace ArrayList with LinkedList to improve performance @@ -311,8 +311,8 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { } private void tryBestToWriteTabletsIntoOneFile( - LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList, - Map<String, Boolean> device2Aligned) + final LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList, + final Map<String, Boolean> device2Aligned) throws IOException, WriteProcessException { final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator = device2TabletsLinkedList.entrySet().iterator(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index f88368d01c8..909d3dc53a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -147,11 +147,11 @@ public abstract class SubscriptionPrefetchingQueue { * <p>It will continuously attempt to prefetch and generate a {@link SubscriptionEvent} until * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty. * - * @param trySealBatchIfEmpty {@code true} if {@link SubscriptionPrefetchingQueue#trySealBatch} is - * called when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty, {@code false} + * @param onEventIfEmpty {@code true} if {@link SubscriptionPrefetchingQueue#onEvent()} is called + * when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty, {@code false} * otherwise */ - protected void tryPrefetch(final boolean trySealBatchIfEmpty) { + protected void tryPrefetch(final boolean onEventIfEmpty) { while (!inputPendingQueue.isEmpty()) { final Event event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()); if (Objects.isNull(event)) { @@ -202,14 +202,14 @@ public abstract class SubscriptionPrefetchingQueue { "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", this, event); - if (trySealBatch()) { + if (onEvent()) { return; } } // At this moment, the inputPendingQueue is empty. - if (trySealBatchIfEmpty) { - trySealBatch(); + if (onEventIfEmpty) { + onEvent(); } } @@ -226,7 +226,7 @@ public abstract class SubscriptionPrefetchingQueue { /** * @return {@code true} if a new event has been prefetched. */ - protected abstract boolean trySealBatch(); + protected abstract boolean onEvent(); /////////////////////////////// commit /////////////////////////////// @@ -310,7 +310,7 @@ public abstract class SubscriptionPrefetchingQueue { return true; } - protected SubscriptionCommitContext generateSubscriptionCommitContext() { + public SubscriptionCommitContext generateSubscriptionCommitContext() { // Recording data node ID and reboot times to address potential stale commit IDs caused by // leader transfers or restarts. return new SubscriptionCommitContext( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java index b0f69da2e76..ddd38f50a59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java @@ -24,14 +24,9 @@ import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTabletEventBatch; -import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; -import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload; -import org.apache.tsfile.write.record.Tablet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue { +public class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPrefetchingTabletQueue.class); @@ -62,7 +57,7 @@ class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue { super(brokerId, topicName, inputPendingQueue); this.currentBatchRef.set( - new SubscriptionPipeTabletEventBatch(BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES)); + new SubscriptionPipeTabletEventBatch(this, BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES)); } @Override @@ -97,51 +92,34 @@ class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue { return onEventInternal(event); } - private boolean onEventInternal(final EnrichedEvent event) { - final AtomicBoolean result = new AtomicBoolean(false); - currentBatchRef.getAndUpdate( - (batch) -> { - if (batch.onEvent(event)) { - sealBatch(batch); - result.set(true); - return new SubscriptionPipeTabletEventBatch( - BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); - } - return batch; - }); - return result.get(); + @Override + protected boolean onEvent() { + return onEventInternal(null); } - @Override - protected boolean trySealBatch() { + private boolean onEventInternal(@Nullable final EnrichedEvent event) { final AtomicBoolean result = new AtomicBoolean(false); currentBatchRef.getAndUpdate( (batch) -> { - if (batch.shouldEmit()) { - sealBatch(batch); + final List<SubscriptionEvent> evs = batch.onEvent(event); + if (!evs.isEmpty()) { + evs.forEach( + (ev) -> { + uncommittedEvents.put(ev.getCommitContext(), ev); // before enqueuing the event + prefetchingQueue.add(ev); + }); result.set(true); return new SubscriptionPipeTabletEventBatch( - BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); + this, BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); } + // If onEvent returns an empty list, one possibility is that the batch has already been + // sealed, which would result in the failure of weakCompareAndSetVolatile to obtain the + // most recent batch. return batch; }); return result.get(); } - private void sealBatch(final SubscriptionPipeTabletEventBatch batch) { - final List<Tablet> tablets = batch.sealTablets(); - final SubscriptionCommitContext commitContext = generateSubscriptionCommitContext(); - final SubscriptionEvent subscriptionEvent = - new SubscriptionEvent( - new SubscriptionPipeTabletBatchEvents(batch), - new SubscriptionPollResponse( - SubscriptionPollResponseType.TABLETS.getType(), - new TabletsPayload(tablets), - commitContext)); - uncommittedEvents.put(commitContext, subscriptionEvent); // before enqueuing the event - prefetchingQueue.add(subscriptionEvent); - } - /** * serialize uncommitted and pollable events in {@link * SubscriptionPrefetchingQueue#prefetchingQueue} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index a461553b87a..2b70ea7d56a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch; -import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFilePlainEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload; @@ -35,6 +34,7 @@ import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseTy import com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,10 +46,9 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue { +public class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPrefetchingTsFileQueue.class); @@ -72,7 +71,7 @@ class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue { this.consumerIdToSubscriptionEventMap = new ConcurrentHashMap<>(); this.currentBatchRef.set( - new SubscriptionPipeTsFileEventBatch(BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES)); + new SubscriptionPipeTsFileEventBatch(this, BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES)); } @Override @@ -305,26 +304,7 @@ class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue { @Override protected boolean onEvent(final TabletInsertionEvent event) { - final AtomicBoolean result = new AtomicBoolean(false); - currentBatchRef.getAndUpdate( - (batch) -> { - try { - if (batch.onEvent(event)) { - sealBatch(batch); - result.set(true); - return new SubscriptionPipeTsFileEventBatch( - BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); - } - return batch; - } catch (final Exception e) { - LOGGER.warn( - "Exception occurred when SubscriptionPrefetchingTsFileQueue {} sealing tsFiles from batch", - this, - e); - return batch; - } - }); - return result.get(); + return onEventInternal(event); } @Override @@ -343,22 +323,35 @@ class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue { } @Override - protected boolean trySealBatch() { + protected boolean onEvent() { + return onEventInternal(null); + } + + private boolean onEventInternal(@Nullable final TabletInsertionEvent event) { final AtomicBoolean result = new AtomicBoolean(false); currentBatchRef.getAndUpdate( (batch) -> { try { - if (batch.shouldEmit()) { - sealBatch(batch); + final List<SubscriptionEvent> evs = batch.onEvent(event); + if (!evs.isEmpty()) { + evs.forEach( + (ev) -> { + uncommittedEvents.put(ev.getCommitContext(), ev); // before enqueuing the event + prefetchingQueue.add(ev); + }); result.set(true); return new SubscriptionPipeTsFileEventBatch( - BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); + this, BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES); } + // If onEvent returns an empty list, one possibility is that the batch has already been + // sealed, which would result in the failure of weakCompareAndSetVolatile to obtain the + // most recent batch. return batch; } catch (final Exception e) { LOGGER.warn( - "Exception occurred when SubscriptionPrefetchingTsFileQueue {} sealing TsFile from batch", + "Exception occurred when SubscriptionPrefetchingTsFileQueue {} sealing TsFiles from batch {}", this, + batch, e); return batch; } @@ -366,23 +359,6 @@ class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue { return result.get(); } - private void sealBatch(final SubscriptionPipeTsFileEventBatch batch) throws Exception { - final List<File> tsFiles = batch.sealTsFiles(); - final AtomicInteger referenceCount = new AtomicInteger(tsFiles.size()); - for (final File tsFile : tsFiles) { - final SubscriptionCommitContext commitContext = generateSubscriptionCommitContext(); - final SubscriptionEvent subscriptionEvent = - new SubscriptionEvent( - new SubscriptionPipeTsFileBatchEvents(batch, tsFile, referenceCount), - new SubscriptionPollResponse( - SubscriptionPollResponseType.FILE_INIT.getType(), - new FileInitPayload(tsFile.getName()), - commitContext)); - uncommittedEvents.put(commitContext, subscriptionEvent); // before enqueuing the event - prefetchingQueue.add(subscriptionEvent); - } - } - /////////////////////////////// commit /////////////////////////////// /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index 892bcaabb68..b0ef2b85116 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -24,9 +24,17 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTabletQueue; +import org.apache.iotdb.db.subscription.event.SubscriptionEvent; +import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTabletBatchEvents; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; +import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload; import org.apache.tsfile.write.record.Tablet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +49,8 @@ public class SubscriptionPipeTabletEventBatch { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPipeTabletEventBatch.class); + private final SubscriptionPrefetchingTabletQueue prefetchingQueue; + private final List<EnrichedEvent> enrichedEvents = new ArrayList<>(); private final List<Tablet> tablets = new ArrayList<>(); @@ -50,30 +60,67 @@ public class SubscriptionPipeTabletEventBatch { private final long maxBatchSizeInBytes; private long totalBufferSize = 0; - public SubscriptionPipeTabletEventBatch(final int maxDelayInMs, final long maxBatchSizeInBytes) { + private boolean isSealed = false; + + public SubscriptionPipeTabletEventBatch( + final SubscriptionPrefetchingTabletQueue prefetchingQueue, + final int maxDelayInMs, + final long maxBatchSizeInBytes) { + this.prefetchingQueue = prefetchingQueue; this.maxDelayInMs = maxDelayInMs; this.maxBatchSizeInBytes = maxBatchSizeInBytes; } - public synchronized List<Tablet> sealTablets() { - return tablets; + public synchronized List<SubscriptionEvent> onEvent(@Nullable final EnrichedEvent event) { + if (isSealed) { + return Collections.emptyList(); + } + if (Objects.nonNull(event)) { + constructBatch(event); + } + if (shouldEmit()) { + final List<SubscriptionEvent> events = generateSubscriptionEvents(); + isSealed = true; + return events; + } + return Collections.emptyList(); } - public synchronized boolean shouldEmit() { - return totalBufferSize >= maxBatchSizeInBytes - || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; + public synchronized void ack() { + for (final EnrichedEvent enrichedEvent : enrichedEvents) { + enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true); + } + } + + public synchronized void cleanup() { + // clear the reference count of events + for (final EnrichedEvent enrichedEvent : enrichedEvents) { + enrichedEvent.clearReferenceCount(this.getClass().getName()); + } + } + + private List<SubscriptionEvent> generateSubscriptionEvents() { + final SubscriptionCommitContext commitContext = + prefetchingQueue.generateSubscriptionCommitContext(); + return Collections.singletonList( + new SubscriptionEvent( + new SubscriptionPipeTabletBatchEvents(this), + new SubscriptionPollResponse( + SubscriptionPollResponseType.TABLETS.getType(), + new TabletsPayload(tablets), + commitContext))); } - public synchronized boolean onEvent(final EnrichedEvent event) { + private void constructBatch(final EnrichedEvent event) { if (event instanceof TabletInsertionEvent) { final List<Tablet> currentTablets = convertToTablets((TabletInsertionEvent) event); if (currentTablets.isEmpty()) { - return shouldEmit(); + return; } tablets.addAll(currentTablets); totalBufferSize += currentTablets.stream() - .map((PipeMemoryWeightUtil::calculateTabletSizeInBytes)) + .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes) .reduce(Long::sum) .orElse(0L); enrichedEvents.add(event); @@ -90,7 +137,7 @@ public class SubscriptionPipeTabletEventBatch { tablets.addAll(currentTablets); totalBufferSize += currentTablets.stream() - .map((PipeMemoryWeightUtil::calculateTabletSizeInBytes)) + .map(PipeMemoryWeightUtil::calculateTabletSizeInBytes) .reduce(Long::sum) .orElse(0L); } @@ -99,21 +146,11 @@ public class SubscriptionPipeTabletEventBatch { firstEventProcessingTime = System.currentTimeMillis(); } } - - return shouldEmit(); - } - - public synchronized void ack() { - for (final EnrichedEvent enrichedEvent : enrichedEvents) { - enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true); - } } - public synchronized void cleanup() { - // clear the reference count of events - for (final EnrichedEvent enrichedEvent : enrichedEvents) { - enrichedEvent.clearReferenceCount(this.getClass().getName()); - } + private boolean shouldEmit() { + return totalBufferSize >= maxBatchSizeInBytes + || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; } private List<Tablet> convertToTablets(final TabletInsertionEvent tabletInsertionEvent) { @@ -131,6 +168,8 @@ public class SubscriptionPipeTabletEventBatch { return Collections.emptyList(); } + /////////////////////////////// stringify /////////////////////////////// + public String toString() { return "SubscriptionPipeTabletEventBatch{enrichedEvents=" + enrichedEvents.stream().map(EnrichedEvent::coreReportMessage).collect(Collectors.toList()) @@ -144,6 +183,8 @@ public class SubscriptionPipeTabletEventBatch { + maxBatchSizeInBytes + ", totalBufferSize=" + totalBufferSize + + ", isSealed=" + + isSealed + "}"; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java index 93f6a9464b4..c36f9ca3964 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java @@ -20,30 +20,54 @@ package org.apache.iotdb.db.subscription.event.batch; import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch; +import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue; +import org.apache.iotdb.db.subscription.event.SubscriptionEvent; +import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; + +import org.checkerframework.checker.nullness.qual.Nullable; import java.io.File; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; public class SubscriptionPipeTsFileEventBatch { + private final SubscriptionPrefetchingTsFileQueue prefetchingQueue; + private final PipeTabletEventTsFileBatch batch; + private boolean isSealed = false; + public SubscriptionPipeTsFileEventBatch( - final int maxDelayInMs, final long requestMaxBatchSizeInBytes) { + final SubscriptionPrefetchingTsFileQueue prefetchingQueue, + final int maxDelayInMs, + final long requestMaxBatchSizeInBytes) { + this.prefetchingQueue = prefetchingQueue; this.batch = new PipeTabletEventTsFileBatch(maxDelayInMs, requestMaxBatchSizeInBytes); } - public synchronized List<File> sealTsFiles() throws Exception { - return batch.sealTsFiles(); - } - - public synchronized boolean shouldEmit() { - return batch.shouldEmit(); - } - - public synchronized boolean onEvent(final TabletInsertionEvent event) throws Exception { - return batch.onEvent(event); + public synchronized List<SubscriptionEvent> onEvent(@Nullable final TabletInsertionEvent event) + throws Exception { + if (isSealed) { + return Collections.emptyList(); + } + if (Objects.nonNull(event)) { + batch.onEvent(event); + } + if (batch.shouldEmit()) { + final List<SubscriptionEvent> events = generateSubscriptionEvents(); + isSealed = true; + return events; + } + return Collections.emptyList(); } public synchronized void ack() { @@ -55,7 +79,27 @@ public class SubscriptionPipeTsFileEventBatch { batch.close(); } + private List<SubscriptionEvent> generateSubscriptionEvents() throws Exception { + final List<SubscriptionEvent> events = new ArrayList<>(); + final List<File> tsFiles = batch.sealTsFiles(); + final AtomicInteger referenceCount = new AtomicInteger(tsFiles.size()); + for (final File tsFile : tsFiles) { + final SubscriptionCommitContext commitContext = + prefetchingQueue.generateSubscriptionCommitContext(); + events.add( + new SubscriptionEvent( + new SubscriptionPipeTsFileBatchEvents(this, tsFile, referenceCount), + new SubscriptionPollResponse( + SubscriptionPollResponseType.FILE_INIT.getType(), + new FileInitPayload(tsFile.getName()), + commitContext))); + } + return events; + } + + /////////////////////////////// stringify /////////////////////////////// + public String toString() { - return "SubscriptionPipeTsFileEventBatch{batch=" + batch + "}"; + return "SubscriptionPipeTsFileEventBatch{batch=" + batch + ", isSealed=" + isSealed + "}"; } }
