This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 351ab3075ba Subscription: fix topic now timestamp precision (#12663)
351ab3075ba is described below

commit 351ab3075bab4429493f889958fa4bcc7e8986b9
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Jun 6 09:55:35 2024 +0800

    Subscription: fix topic now timestamp precision (#12663)
---
 .../it/dual/AbstractSubscriptionDualIT.java        |  13 +-
 .../it/dual/IoTDBSubscriptionConsumerGroupIT.java  |   9 +
 .../it/dual/IoTDBSubscriptionTimePrecisionIT.java  | 195 +++++++++++++++++++++
 .../it/dual/IoTDBSubscriptionTopicIT.java          |   2 +-
 .../subscription/topic/CreateTopicProcedure.java   |   6 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  12 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  |   8 +-
 .../config/executor/ClusterConfigTaskExecutor.java |   8 +-
 .../commons/subscription/meta/topic/TopicMeta.java |   2 +-
 9 files changed, 238 insertions(+), 17 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 1d0ef260ed4..9aa01ad4acf 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -36,19 +36,20 @@ abstract class AbstractSubscriptionDualIT {
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
 
+    setUpConfig();
+
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
+
+  void setUpConfig() {
     // enable auto create schema
     senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
     receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
 
-    // for IoTDBSubscriptionConsumerGroupIT
-    
receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);
-
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
     receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
   }
 
   @After
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 07d5b4ed791..992d151520f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -108,6 +108,15 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
     }
   }
 
+  @Override
+  void setUpConfig() {
+    super.setUpConfig();
+
+    // Enable air gap receiver
+    
receiverEnv.getConfig().getCommonConfig().setPipeAirGapReceiverEnabled(true);
+  }
+
+  @Override
   @Before
   public void setUp() {
     super.setUp();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
new file mode 100644
index 00000000000..2b1cc407b7f
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTimePrecisionIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.dual;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
+
+import org.apache.tsfile.write.record.Tablet;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2Subscription.class})
+public class IoTDBSubscriptionTimePrecisionIT extends 
AbstractSubscriptionDualIT {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBSubscriptionTimePrecisionIT.class);
+
+  @Override
+  void setUpConfig() {
+    super.setUpConfig();
+
+    // Set timestamp precision to nanosecond
+    senderEnv.getConfig().getCommonConfig().setTimestampPrecision("ns");
+    receiverEnv.getConfig().getCommonConfig().setTimestampPrecision("ns");
+  }
+
+  @Test
+  public void testTopicTimePrecision() throws Exception {
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+
+    // Insert some historical data on sender
+    final long currentTime1 = System.currentTimeMillis() * 1000_000L; // in 
nanosecond
+    try (final ISession session = senderEnv.getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s2) values (%s, 1)", 
currentTime1 - i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic on sender
+    final String topic1 = "topic1";
+    final String topic2 = "topic2";
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      {
+        final Properties config = new Properties();
+        config.put(TopicConstant.START_TIME_KEY, currentTime1 - 99);
+        config.put(
+            TopicConstant.END_TIME_KEY,
+            TopicConstant.NOW_TIME_VALUE); // now should be strictly larger 
than current time 1
+        session.createTopic(topic1, config);
+      }
+      {
+        final Properties config = new Properties();
+        config.put(
+            TopicConstant.START_TIME_KEY,
+            TopicConstant.NOW_TIME_VALUE); // now should be strictly smaller 
than current time 2
+        session.createTopic(topic2, config);
+      }
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Insert some historical data on sender again
+    final long currentTime2 = System.currentTimeMillis() * 1000_000L; // in 
nanosecond
+    try (final ISession session = senderEnv.getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d2(time, s1) values (%s, 1)", 
currentTime2 + i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscribe on sender and insert on receiver
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                      new SubscriptionPullConsumer.Builder()
+                          .host(host)
+                          .port(port)
+                          .consumerId("c1")
+                          .consumerGroupId("cg1")
+                          .autoCommit(false)
+                          .buildPullConsumer();
+                  final ISession session = receiverEnv.getSessionConnection()) 
{
+                consumer.open();
+                consumer.subscribe(topic1, topic2);
+                while (!isClosed.get()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  final List<SubscriptionMessage> messages =
+                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                  for (final SubscriptionMessage message : messages) {
+                    for (final Iterator<Tablet> it =
+                            
message.getSessionDataSetsHandler().tabletIterator();
+                        it.hasNext(); ) {
+                      final Tablet tablet = it.next();
+                      session.insertTablet(tablet);
+                    }
+                  }
+                  consumer.commitSync(messages);
+                }
+                // Auto unsubscribe topics
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            });
+    thread.start();
+
+    // Check data on receiver
+    try {
+      try (final Connection connection = receiverEnv.getConnection();
+          final Statement statement = connection.createStatement()) {
+        // Keep retrying if there are execution failures
+        Awaitility.await()
+            
.pollDelay(IoTDBSubscriptionITConstant.AWAITILITY_POLL_DELAY_SECOND, 
TimeUnit.SECONDS)
+            .pollInterval(
+                IoTDBSubscriptionITConstant.AWAITILITY_POLL_INTERVAL_SECOND, 
TimeUnit.SECONDS)
+            .atMost(IoTDBSubscriptionITConstant.AWAITILITY_AT_MOST_SECOND, 
TimeUnit.SECONDS)
+            .untilAsserted(
+                () ->
+                    TestUtils.assertSingleResultSetEqual(
+                        TestUtils.executeQueryWithRetry(statement, "select 
count(*) from root.**"),
+                        new HashMap<String, String>() {
+                          {
+                            put("count(root.db.d1.s2)", "100");
+                            put("count(root.db.d2.s1)", "100");
+                          }
+                        }));
+      }
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 7091c93b4db..0cd6bc0d0a1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -506,7 +506,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       session.open();
       final Properties properties = new Properties();
       properties.put(TopicConstant.START_TIME_KEY, "2024-01-32");
-      properties.put(TopicConstant.END_TIME_KEY, "now");
+      properties.put(TopicConstant.END_TIME_KEY, TopicConstant.NOW_TIME_VALUE);
       session.createTopic("topic1", properties);
       fail();
     } catch (final Exception ignored) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
index ed3d59bd3d4..afdbfe244d7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.confignode.procedure.impl.subscription.topic;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import 
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -76,7 +78,9 @@ public class CreateTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
     topicMeta =
         new TopicMeta(
             createTopicReq.getTopicName(),
-            System.currentTimeMillis(),
+            CommonDateTimeUtils.convertMilliTimeWithPrecision(
+                System.currentTimeMillis(),
+                
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
             createTopicReq.getTopicAttributes());
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index b5bff97ec98..dc86e82e361 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -144,11 +144,13 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
         if (historicalDataExtractionStartTime > 
historicalDataExtractionEndTime) {
           throw new PipeParameterNotValidException(
               String.format(
-                  "%s or %s should be less than or equal to %s or %s.",
+                  "%s (%s) [%s] should be less than or equal to %s (%s) [%s].",
                   SOURCE_START_TIME_KEY,
                   EXTRACTOR_START_TIME_KEY,
+                  historicalDataExtractionStartTime,
                   SOURCE_END_TIME_KEY,
-                  EXTRACTOR_END_TIME_KEY));
+                  EXTRACTOR_END_TIME_KEY,
+                  historicalDataExtractionEndTime));
         }
       } catch (final Exception e) {
         // compatible with the current validation framework
@@ -191,11 +193,13 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       if (historicalDataExtractionStartTime > historicalDataExtractionEndTime) 
{
         throw new PipeParameterNotValidException(
             String.format(
-                "%s (%s) should be less than or equal to %s (%s).",
+                "%s (%s) [%s] should be less than or equal to %s (%s) [%s].",
                 EXTRACTOR_HISTORY_START_TIME_KEY,
                 SOURCE_HISTORY_START_TIME_KEY,
+                historicalDataExtractionStartTime,
                 EXTRACTOR_HISTORY_END_TIME_KEY,
-                SOURCE_HISTORY_END_TIME_KEY));
+                SOURCE_HISTORY_END_TIME_KEY,
+                historicalDataExtractionEndTime));
       }
     } catch (final Exception e) {
       // Compatible with the current validation framework
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 5415cb182e4..c4bec4246e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -133,11 +133,13 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
       if (realtimeDataExtractionStartTime > realtimeDataExtractionEndTime) {
         throw new PipeParameterNotValidException(
             String.format(
-                "%s or %s should be less than or equal to %s or %s.",
+                "%s (%s) [%s] should be less than or equal to %s (%s) [%s].",
                 SOURCE_START_TIME_KEY,
                 EXTRACTOR_START_TIME_KEY,
+                realtimeDataExtractionStartTime,
                 SOURCE_END_TIME_KEY,
-                EXTRACTOR_END_TIME_KEY));
+                EXTRACTOR_END_TIME_KEY,
+                realtimeDataExtractionEndTime));
       }
     } catch (final Exception e) {
       // compatible with the current validation framework
@@ -299,7 +301,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     // Record the pending queue size before trying to put heartbeatEvent into 
queue
     ((PipeHeartbeatEvent) 
event.getEvent()).recordExtractorQueueSize(pendingQueue);
 
-    Event lastEvent = pendingQueue.peekLast();
+    final Event lastEvent = pendingQueue.peekLast();
     if (lastEvent instanceof PipeRealtimeEvent
         && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof 
PipeHeartbeatEvent
         && (((PipeHeartbeatEvent) ((PipeRealtimeEvent) 
lastEvent).getEvent()).isShouldPrintMessage()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d0ebc933382..cb4d6f57fb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -49,6 +49,7 @@ import 
org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFClassLoader;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
@@ -1917,7 +1918,12 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
     // Validate topic config
     final TopicMeta temporaryTopicMeta =
-        new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes);
+        new TopicMeta(
+            topicName,
+            CommonDateTimeUtils.convertMilliTimeWithPrecision(
+                System.currentTimeMillis(),
+                
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+            topicAttributes);
     try {
       
PipeAgent.plugin().validateExtractor(temporaryTopicMeta.generateExtractorAttributes());
       
PipeAgent.plugin().validateProcessor(temporaryTopicMeta.generateProcessorAttributes());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 33f5065cbae..a0e80db04e0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -39,7 +39,7 @@ import java.util.Set;
 public class TopicMeta {
 
   private String topicName;
-  private long creationTime;
+  private long creationTime; // raw timestamp based on system timestamp 
precision
   private TopicConfig config;
 
   private Set<String> subscribedConsumerGroupIds;

Reply via email to