This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 351ab3075ba Subscription: fix topic now timestamp precision (#12663)
351ab3075ba is described below
commit 351ab3075bab4429493f889958fa4bcc7e8986b9
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Jun 6 09:55:35 2024 +0800
Subscription: fix topic now timestamp precision (#12663)
---
.../it/dual/AbstractSubscriptionDualIT.java | 13 +-
.../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 9 +
.../it/dual/IoTDBSubscriptionTimePrecisionIT.java | 195 +++++++++++++++++++++
.../it/dual/IoTDBSubscriptionTopicIT.java | 2 +-
.../subscription/topic/CreateTopicProcedure.java | 6 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 12 +-
.../realtime/PipeRealtimeDataRegionExtractor.java | 8 +-
.../config/executor/ClusterConfigTaskExecutor.java | 8 +-
.../commons/subscription/meta/topic/TopicMeta.java | 2 +-
9 files changed, 238 insertions(+), 17 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 1d0ef260ed4..9aa01ad4acf 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -36,19 +36,20 @@ abstract class AbstractSubscriptionDualIT {
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);
+ setUpConfig();
+
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ }
+
+ void setUpConfig() {
// enable auto create schema
senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
- // for IoTDBSubscriptionConsumerGroupIT
-
receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);
-
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
-
- senderEnv.initClusterEnvironment();
- receiverEnv.initClusterEnvironment();
}
@After
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 07d5b4ed791..992d151520f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -108,6 +108,15 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
}
}
+ @Override
+ void setUpConfig() {
+ super.setUpConfig();
+
+ // Enable air gap receiver
+
receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);
+ }
+
+ @Override
@Before
public void setUp() {
super.setUp();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
new file mode 100644
index 00000000000..2b1cc407b7f
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.dual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+
+import org.apache.tsfile.write.record.Tablet;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2Subscription.class})
+public class IoTDBSubscriptionTimePrecisionIT extends
AbstractSubscriptionDualIT {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class);
+
+ @Override
+ void setUpConfig() {
+ super.setUpConfig();
+
+ // Set timestamp precision to nanosecond
+ senderEnv.getConfig().getCommonConfig().setTimestampPrecision("ns");
+ receiverEnv.getConfig().getCommonConfig().setTimestampPrecision("ns");
+ }
+
+ @Test
+ public void testTopicTimePrecision() throws Exception {
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+
+ // Insert some historical data on sender
+ final long currentTime1 = System.currentTimeMillis() * 1000_000L; // in
nanosecond
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s2) values (%s, 1)",
currentTime1 - i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic on sender
+ final String topic1 = "topic1";
+ final String topic2 = "topic2";
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ {
+ final Properties config = new Properties();
+ config.put(TopicConstant.START_TIME_KEY, currentTime1 - 99);
+ config.put(
+ TopicConstant.END_TIME_KEY,
+ TopicConstant.NOW_TIME_VALUE); // now should be strictly larger
than current time 1
+ session.createTopic(topic1, config);
+ }
+ {
+ final Properties config = new Properties();
+ config.put(
+ TopicConstant.START_TIME_KEY,
+ TopicConstant.NOW_TIME_VALUE); // now should be strictly smaller
than current time 2
+ session.createTopic(topic2, config);
+ }
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Insert some historical data on sender again
+ final long currentTime2 = System.currentTimeMillis() * 1000_000L; // in
nanosecond
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d2(time, s1) values (%s, 1)",
currentTime2 + i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscribe on sender and insert on receiver
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer();
+ final ISession session = receiverEnv.getSessionConnection())
{
+ consumer.open();
+ consumer.subscribe(topic1, topic2);
+ while (!isClosed.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
+ final List<SubscriptionMessage> messages =
+
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ for (final SubscriptionMessage message : messages) {
+ for (final Iterator<Tablet> it =
+
message.getSessionDataSetsHandler().tabletIterator();
+ it.hasNext(); ) {
+ final Tablet tablet = it.next();
+ session.insertTablet(tablet);
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ // Auto unsubscribe topics
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ });
+ thread.start();
+
+ // Check data on receiver
+ try {
+ try (final Connection connection = receiverEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failures
+ Awaitility.await()
+
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND,
TimeUnit.SECONDS)
+ .pollInterval(
+ IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND,
TimeUnit.SECONDS)
+ .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND,
TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "select
count(*) from root.**"),
+ new HashMap<String, String>() {
+ {
+ put("count(root.db.d1.s2)", "100");
+ put("count(root.db.d2.s1)", "100");
+ }
+ }));
+ }
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 7091c93b4db..0cd6bc0d0a1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -506,7 +506,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
session.open();
final Properties properties = new Properties();
properties.put(TopicConstant.START_TIME_KEY, "2024-01-32");
- properties.put(TopicConstant.END_TIME_KEY, "now");
+ properties.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE);
session.createTopic("topic1", properties);
fail();
} catch (final Exception ignored) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
index ed3d59bd3d4..afdbfe244d7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
@@ -20,7 +20,9 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -76,7 +78,9 @@ public class CreateTopicProcedure extends
AbstractOperateSubscriptionProcedure {
topicMeta =
new TopicMeta(
createTopicReq.getTopicName(),
- System.currentTimeMillis(),
+ CommonDateTimeUtils.convertMilliTimeWithPrecision(
+ System.currentTimeMillis(),
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
createTopicReq.getTopicAttributes());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index b5bff97ec98..dc86e82e361 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -144,11 +144,13 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
if (historicalDataExtractionStartTime >
historicalDataExtractionEndTime) {
throw new PipeParameterNotValidException(
String.format(
- "%s or %s should be less than or equal to %s or %s.",
+ "%s (%s) [%s] should be less than or equal to %s (%s) [%s].",
SOURCE_START_TIME_KEY,
EXTRACTOR_START_TIME_KEY,
+ historicalDataExtractionStartTime,
SOURCE_END_TIME_KEY,
- EXTRACTOR_END_TIME_KEY));
+ EXTRACTOR_END_TIME_KEY,
+ historicalDataExtractionEndTime));
}
} catch (final Exception e) {
// compatible with the current validation framework
@@ -191,11 +193,13 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
if (historicalDataExtractionStartTime > historicalDataExtractionEndTime)
{
throw new PipeParameterNotValidException(
String.format(
- "%s (%s) should be less than or equal to %s (%s).",
+ "%s (%s) [%s] should be less than or equal to %s (%s) [%s].",
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY,
+ historicalDataExtractionStartTime,
EXTRACTOR_HISTORY_END_TIME_KEY,
- SOURCE_HISTORY_END_TIME_KEY));
+ SOURCE_HISTORY_END_TIME_KEY,
+ historicalDataExtractionEndTime));
}
} catch (final Exception e) {
// Compatible with the current validation framework
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 5415cb182e4..c4bec4246e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -133,11 +133,13 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
if (realtimeDataExtractionStartTime > realtimeDataExtractionEndTime) {
throw new PipeParameterNotValidException(
String.format(
- "%s or %s should be less than or equal to %s or %s.",
+ "%s (%s) [%s] should be less than or equal to %s (%s) [%s].",
SOURCE_START_TIME_KEY,
EXTRACTOR_START_TIME_KEY,
+ realtimeDataExtractionStartTime,
SOURCE_END_TIME_KEY,
- EXTRACTOR_END_TIME_KEY));
+ EXTRACTOR_END_TIME_KEY,
+ realtimeDataExtractionEndTime));
}
} catch (final Exception e) {
// compatible with the current validation framework
@@ -299,7 +301,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
// Record the pending queue size before trying to put heartbeatEvent into
queue
((PipeHeartbeatEvent)
event.getEvent()).recordExtractorQueueSize(pendingQueue);
- Event lastEvent = pendingQueue.peekLast();
+ final Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof
PipeHeartbeatEvent
&& (((PipeHeartbeatEvent) ((PipeRealtimeEvent)
lastEvent).getEvent()).isShouldPrintMessage()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d0ebc933382..cb4d6f57fb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
@@ -1917,7 +1918,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// Validate topic config
final TopicMeta temporaryTopicMeta =
- new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes);
+ new TopicMeta(
+ topicName,
+ CommonDateTimeUtils.convertMilliTimeWithPrecision(
+ System.currentTimeMillis(),
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+ topicAttributes);
try {
PipeAgent.plugin().validateExtractor(temporaryTopicMeta.generateExtractorAttributes());
PipeAgent.plugin().validateProcessor(temporaryTopicMeta.generateProcessorAttributes());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 33f5065cbae..a0e80db04e0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -39,7 +39,7 @@ import java.util.Set;
public class TopicMeta {
private String topicName;
- private long creationTime;
+ private long creationTime; // raw timestamp based on system timestamp
precision
private TopicConfig config;
private Set<String> subscribedConsumerGroupIds;