This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cbdf31bf6f Subscription: support topic loose range for path and time  
(#12760)
6cbdf31bf6f is described below

commit 6cbdf31bf6f2af2d41fbe19f910f49751c937403
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jun 19 20:37:41 2024 +0800

    Subscription: support topic loose range for path and time  (#12760)
---
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |  36 ++++++
 .../pipe/it/autocreate/IoTDBPipeExtractorIT.java   |  15 ++-
 .../it/IoTDBSubscriptionITConstant.java            |   2 +-
 .../it/dual/IoTDBSubscriptionTopicIT.java          | 135 +++++++++++++++++++++
 .../iotdb/rpc/subscription/config/TopicConfig.java |  34 +++---
 .../rpc/subscription/config/TopicConstant.java     |   6 +
 .../commons/subscription/meta/topic/TopicMeta.java |   2 +
 7 files changed, 213 insertions(+), 17 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 176d0158dfc..372c9200c5e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -704,6 +704,9 @@ public class TestUtils {
         Statement statement = connection.createStatement()) {
       // Keep retrying if there are execution failures
       await()
+          .pollInSameThread()
+          .pollDelay(1L, TimeUnit.SECONDS)
+          .pollInterval(1L, TimeUnit.SECONDS)
           .atMost(timeoutSeconds, TimeUnit.SECONDS)
           .untilAsserted(
               () -> {
@@ -720,6 +723,36 @@ public class TestUtils {
     }
   }
 
+  public static void assertDataEventuallyOnEnv(
+      BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult) {
+    assertDataEventuallyOnEnv(env, sql, expectedHeaderWithResult, 600);
+  }
+
+  public static void assertDataEventuallyOnEnv(
+      BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult, 
long timeoutSeconds) {
+    try (Connection connection = env.getConnection();
+        Statement statement = connection.createStatement()) {
+      // Keep retrying if there are execution failures
+      await()
+          .pollInSameThread()
+          .pollDelay(1L, TimeUnit.SECONDS)
+          .pollInterval(1L, TimeUnit.SECONDS)
+          .atMost(timeoutSeconds, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                try {
+                  TestUtils.assertSingleResultSetEqual(
+                      executeQueryWithRetry(statement, sql), 
expectedHeaderWithResult);
+                } catch (Exception e) {
+                  Assert.fail();
+                }
+              });
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
   public static void assertDataAlwaysOnEnv(
       BaseEnv env, String sql, String expectedHeader, Set<String> 
expectedResSet) {
     assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10);
@@ -735,6 +768,9 @@ public class TestUtils {
         Statement statement = connection.createStatement()) {
       // Keep retrying if there are execution failures
       await()
+          .pollInSameThread()
+          .pollDelay(1L, TimeUnit.SECONDS)
+          .pollInterval(1L, TimeUnit.SECONDS)
           .atMost(consistentSeconds, TimeUnit.SECONDS)
           .failFast(
               () -> {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index 8ce885935b8..2a4439eb605 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -1013,12 +1013,23 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
         return;
       }
 
-      TestUtils.tryExecuteNonQueriesWithRetry(
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
           Arrays.asList(
               "insert into root.db.d1 (time, at1)" + " values (5000, 1), 
(16000, 3)",
               "insert into root.db.d1 (time, at1, at2)" + " values (5001, 1, 
2), (6001, 3, 4)",
-              "flush"));
+              "flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(at1) from root.db.d1 where time >= 2000 and time <= 
10000",
+          new HashMap<String, String>() {
+            {
+              put("count(root.db.d1.at1)", "4");
+            }
+          });
     }
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index 840abb4ccb5..5b8ec393274 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
 public class IoTDBSubscriptionITConstant {
 
   private static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
-  private static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
+  private static final long AWAITILITY_POLL_INTERVAL_SECOND = 1L;
   private static final long AWAITILITY_AT_MOST_SECOND = 600L;
 
   public static final ConditionFactory AWAIT =
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 19c033af151..d4aee45c482 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -533,6 +533,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   @Test
   public void testTopicInvalidPathConfig() throws Exception {
     // Test invalid path when using tsfile format
+    // NOTE: Delete this test after the restriction "on path/time 
range/processor when subscribing
+    // to tsfile" is removed.
     final Properties config = new Properties();
     config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
     config.put(TopicConstant.PATH_KEY, "root.db.*.s");
@@ -542,6 +544,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
   @Test
   public void testTopicInvalidProcessorConfig() throws Exception {
     // Test invalid processor when using tsfile format
+    // NOTE: Delete this test after the restriction "on path/time 
range/processor when subscribing
+    // to tsfile" is removed.
     final Properties config = new Properties();
     config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
     config.put("processor", "tumbling-time-sampling-processor");
@@ -578,6 +582,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       e.printStackTrace();
       fail(e.getMessage());
     }
+    assertTopicCount(1);
 
     // Subscription
     final AtomicInteger rowCount = new AtomicInteger();
@@ -652,6 +657,136 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     }
   }
 
+  @Test
+  public void testTopicWithLooseRange() throws Exception {
+    // Insert some historical data on sender
+    try (final ISession session = senderEnv.getSessionConnection()) {
+      session.executeNonQueryStatement(
+          "insert into root.db.d1 (time, at1, at2) values (1000, 1, 2), (2000, 
3, 4)");
+      session.executeNonQueryStatement(
+          "insert into root.db1.d1 (time, at1, at2) values (1000, 1, 2), 
(2000, 3, 4)");
+      session.executeNonQueryStatement(
+          "insert into root.db.d1 (time, at1, at2) values (3000, 1, 2), (4000, 
3, 4)");
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic12";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.LOOSE_RANGE_KEY, 
TopicConstant.LOOSE_RANGE_TIME_AND_PATH_VALUE);
+      config.put(TopicConstant.PATH_KEY, "root.db.d1.at1");
+      config.put(TopicConstant.START_TIME_KEY, "1500");
+      config.put(TopicConstant.END_TIME_KEY, "2500");
+      session.createTopic(topicName, config);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    assertTopicCount(1);
+
+    final AtomicBoolean dataPrepared = new AtomicBoolean(false);
+    final AtomicBoolean topicSubscribed = new AtomicBoolean(false);
+    final AtomicBoolean result = new AtomicBoolean(false);
+    final List<Thread> threads = new ArrayList<>();
+
+    // Subscribe on sender
+    threads.add(
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                      new SubscriptionPullConsumer.Builder()
+                          .host(host)
+                          .port(port)
+                          .consumerId("c1")
+                          .consumerGroupId("cg1")
+                          .autoCommit(false)
+                          .buildPullConsumer();
+                  final ISession session = receiverEnv.getSessionConnection()) 
{
+                consumer.open();
+                consumer.subscribe(topicName);
+                topicSubscribed.set(true);
+                while (!result.get()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  if (dataPrepared.get()) {
+                    final List<SubscriptionMessage> messages =
+                        
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                    for (final SubscriptionMessage message : messages) {
+                      for (final Iterator<Tablet> it =
+                              
message.getSessionDataSetsHandler().tabletIterator();
+                          it.hasNext(); ) {
+                        final Tablet tablet = it.next();
+                        session.insertTablet(tablet);
+                      }
+                    }
+                    consumer.commitSync(messages);
+                  }
+                }
+                consumer.unsubscribe(topicName);
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getMethodName())));
+
+    // Insert some realtime data on sender
+    threads.add(
+        new Thread(
+            () -> {
+              while (!topicSubscribed.get()) {
+                LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+              }
+              try (final ISession session = senderEnv.getSessionConnection()) {
+                session.executeNonQueryStatement(
+                    "insert into root.db.d1 (time, at1, at2) values (1001, 1, 
2), (2001, 3, 4)");
+                session.executeNonQueryStatement(
+                    "insert into root.db1.d1 (time, at1, at2) values (1001, 1, 
2), (2001, 3, 4)");
+                session.executeNonQueryStatement(
+                    "insert into root.db.d1 (time, at1, at2) values (3001, 1, 
2), (4001, 3, 4)");
+                session.executeNonQueryStatement("flush");
+              } catch (final Exception e) {
+                e.printStackTrace();
+                fail(e.getMessage());
+              }
+              dataPrepared.set(true);
+            },
+            String.format("%s - data inserter", testName.getMethodName())));
+
+    for (final Thread thread : threads) {
+      thread.start();
+    }
+
+    try (final Connection connection = receiverEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(
+          () ->
+              TestUtils.assertSingleResultSetEqual(
+                  TestUtils.executeQueryWithRetry(
+                      statement,
+                      "select count(at1) from root.db.d1 where time >= 1500 
and time <= 2500"),
+                  new HashMap<String, String>() {
+                    {
+                      put("count(root.db.d1.at1)", "2");
+                    }
+                  }));
+    }
+
+    result.set(true);
+    for (final Thread thread : threads) {
+      thread.join();
+    }
+  }
+
   private void testTopicInvalidRuntimeConfigTemplate(
       final String topicName, final Properties config) throws Exception {
     // Create topic
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index c3b5f24622c..6dc42ed634a 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -28,7 +28,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class TopicConfig extends PipeParameters {
 
@@ -40,14 +43,6 @@ public class TopicConfig extends PipeParameters {
     super(attributes);
   }
 
-  private static final Map<String, String> LOOSE_RANGE_TIME_CONFIG =
-      new HashMap<String, String>() {
-        {
-          put("history.loose-range", "time");
-          put("realtime.loose-range", "time");
-        }
-      };
-
   private static final Map<String, String> REALTIME_BATCH_MODE_CONFIG =
       Collections.singletonMap("realtime.mode", "batch");
   private static final Map<String, String> REALTIME_STREAM_MODE_CONFIG =
@@ -58,6 +53,15 @@ public class TopicConfig extends PipeParameters {
   private static final Map<String, String> SUBSCRIBE_MODE_CONFIG =
       Collections.singletonMap("mode", "subscribe");
 
+  private static final Set<String> LOOSE_RANGE_KEY_SET =
+      Collections.unmodifiableSet(
+          new HashSet<String>() {
+            {
+              add("history.loose-range");
+              add("realtime.loose-range");
+            }
+          });
+
   /////////////////////////////// de/ser ///////////////////////////////
 
   public void serialize(final DataOutputStream stream) throws IOException {
@@ -102,12 +106,6 @@ public class TopicConfig extends PipeParameters {
       attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, endTime);
     }
 
-    // enable loose range when using tsfile format
-    if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(
-        attributes.getOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE))) {
-      attributesWithTimeRange.putAll(LOOSE_RANGE_TIME_CONFIG);
-    }
-
     return attributesWithTimeRange;
   }
 
@@ -125,6 +123,14 @@ public class TopicConfig extends PipeParameters {
         : SUBSCRIBE_MODE_CONFIG;
   }
 
+  public Map<String, String> getAttributesWithSourceLooseRange() {
+    final String looseRangeValue =
+        attributes.getOrDefault(
+            TopicConstant.LOOSE_RANGE_KEY, 
TopicConstant.LOOSE_RANGE_DEFAULT_VALUE);
+    return LOOSE_RANGE_KEY_SET.stream()
+        .collect(Collectors.toMap(key -> key, key -> looseRangeValue));
+  }
+
   public Map<String, String> getAttributesWithProcessorPrefix() {
     final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
     attributes.forEach(
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
index a6e8ec901bf..a3226c1d640 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
@@ -40,6 +40,12 @@ public class TopicConstant {
   public static final String FORMAT_TS_FILE_HANDLER_VALUE = "TsFileHandler";
   public static final String FORMAT_DEFAULT_VALUE = 
FORMAT_SESSION_DATA_SETS_HANDLER_VALUE;
 
+  public static final String LOOSE_RANGE_KEY = "loose-range";
+  public static final String LOOSE_RANGE_TIME_VALUE = "time";
+  public static final String LOOSE_RANGE_PATH_VALUE = "path";
+  public static final String LOOSE_RANGE_TIME_AND_PATH_VALUE = "time,path";
+  public static final String LOOSE_RANGE_DEFAULT_VALUE = "";
+
   private TopicConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 70666a07182..14c48c85da3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -187,6 +187,8 @@ public class TopicMeta {
     extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
     // source mode
     extractorAttributes.putAll(config.getAttributesWithSourceMode());
+    // loose range
+    extractorAttributes.putAll(config.getAttributesWithSourceLooseRange());
     return extractorAttributes;
   }
 

Reply via email to