This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6cbdf31bf6f Subscription: support topic loose range for path and time
(#12760)
6cbdf31bf6f is described below
commit 6cbdf31bf6f2af2d41fbe19f910f49751c937403
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jun 19 20:37:41 2024 +0800
Subscription: support topic loose range for path and time (#12760)
---
.../org/apache/iotdb/db/it/utils/TestUtils.java | 36 ++++++
.../pipe/it/autocreate/IoTDBPipeExtractorIT.java | 15 ++-
.../it/IoTDBSubscriptionITConstant.java | 2 +-
.../it/dual/IoTDBSubscriptionTopicIT.java | 135 +++++++++++++++++++++
.../iotdb/rpc/subscription/config/TopicConfig.java | 34 +++---
.../rpc/subscription/config/TopicConstant.java | 6 +
.../commons/subscription/meta/topic/TopicMeta.java | 2 +
7 files changed, 213 insertions(+), 17 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 176d0158dfc..372c9200c5e 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -704,6 +704,9 @@ public class TestUtils {
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
+ .pollInSameThread()
+ .pollDelay(1L, TimeUnit.SECONDS)
+ .pollInterval(1L, TimeUnit.SECONDS)
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.untilAsserted(
() -> {
@@ -720,6 +723,36 @@ public class TestUtils {
}
}
+ public static void assertDataEventuallyOnEnv(
+ BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult) {
+ assertDataEventuallyOnEnv(env, sql, expectedHeaderWithResult, 600);
+ }
+
+ public static void assertDataEventuallyOnEnv(
+ BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult,
long timeoutSeconds) {
+ try (Connection connection = env.getConnection();
+ Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failures
+ await()
+ .pollInSameThread()
+ .pollDelay(1L, TimeUnit.SECONDS)
+ .pollInterval(1L, TimeUnit.SECONDS)
+ .atMost(timeoutSeconds, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try {
+ TestUtils.assertSingleResultSetEqual(
+ executeQueryWithRetry(statement, sql),
expectedHeaderWithResult);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
public static void assertDataAlwaysOnEnv(
BaseEnv env, String sql, String expectedHeader, Set<String>
expectedResSet) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10);
@@ -735,6 +768,9 @@ public class TestUtils {
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
+ .pollInSameThread()
+ .pollDelay(1L, TimeUnit.SECONDS)
+ .pollInterval(1L, TimeUnit.SECONDS)
.atMost(consistentSeconds, TimeUnit.SECONDS)
.failFast(
() -> {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index 8ce885935b8..2a4439eb605 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -1013,12 +1013,23 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
return;
}
- TestUtils.tryExecuteNonQueriesWithRetry(
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1 (time, at1)" + " values (5000, 1),
(16000, 3)",
"insert into root.db.d1 (time, at1, at2)" + " values (5001, 1,
2), (6001, 3, 4)",
- "flush"));
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(at1) from root.db.d1 where time >= 2000 and time <=
10000",
+ new HashMap<String, String>() {
+ {
+ put("count(root.db.d1.at1)", "4");
+ }
+ });
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index 840abb4ccb5..5b8ec393274 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
public class IoTDBSubscriptionITConstant {
private static final long AWAITILITY_POLL_DELAY_SECOND = 1L;
- private static final long AWAITILITY_POLL_INTERVAL_SECOND = 2L;
+ private static final long AWAITILITY_POLL_INTERVAL_SECOND = 1L;
private static final long AWAITILITY_AT_MOST_SECOND = 600L;
public static final ConditionFactory AWAIT =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 19c033af151..d4aee45c482 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -533,6 +533,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
@Test
public void testTopicInvalidPathConfig() throws Exception {
// Test invalid path when using tsfile format
+ // NOTE: Delete this test after the restriction "on path/time
range/processor when subscribing
+ // to tsfile" is removed.
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
@@ -542,6 +544,8 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
@Test
public void testTopicInvalidProcessorConfig() throws Exception {
// Test invalid processor when using tsfile format
+ // NOTE: Delete this test after the restriction "on path/time
range/processor when subscribing
+ // to tsfile" is removed.
final Properties config = new Properties();
config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
config.put("processor", "tumbling-time-sampling-processor");
@@ -578,6 +582,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
e.printStackTrace();
fail(e.getMessage());
}
+ assertTopicCount(1);
// Subscription
final AtomicInteger rowCount = new AtomicInteger();
@@ -652,6 +657,136 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
}
+ @Test
+ public void testTopicWithLooseRange() throws Exception {
+ // Insert some historical data on sender
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ session.executeNonQueryStatement(
+ "insert into root.db.d1 (time, at1, at2) values (1000, 1, 2), (2000,
3, 4)");
+ session.executeNonQueryStatement(
+ "insert into root.db1.d1 (time, at1, at2) values (1000, 1, 2),
(2000, 3, 4)");
+ session.executeNonQueryStatement(
+ "insert into root.db.d1 (time, at1, at2) values (3000, 1, 2), (4000,
3, 4)");
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String topicName = "topic12";
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.LOOSE_RANGE_KEY,
TopicConstant.LOOSE_RANGE_TIME_AND_PATH_VALUE);
+ config.put(TopicConstant.PATH_KEY, "root.db.d1.at1");
+ config.put(TopicConstant.START_TIME_KEY, "1500");
+ config.put(TopicConstant.END_TIME_KEY, "2500");
+ session.createTopic(topicName, config);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ assertTopicCount(1);
+
+ final AtomicBoolean dataPrepared = new AtomicBoolean(false);
+ final AtomicBoolean topicSubscribed = new AtomicBoolean(false);
+ final AtomicBoolean result = new AtomicBoolean(false);
+ final List<Thread> threads = new ArrayList<>();
+
+ // Subscribe on sender
+ threads.add(
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer();
+ final ISession session = receiverEnv.getSessionConnection())
{
+ consumer.open();
+ consumer.subscribe(topicName);
+ topicSubscribed.set(true);
+ while (!result.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
+ if (dataPrepared.get()) {
+ final List<SubscriptionMessage> messages =
+
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ for (final SubscriptionMessage message : messages) {
+ for (final Iterator<Tablet> it =
+
message.getSessionDataSetsHandler().tabletIterator();
+ it.hasNext(); ) {
+ final Tablet tablet = it.next();
+ session.insertTablet(tablet);
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ }
+ consumer.unsubscribe(topicName);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ },
+ String.format("%s - consumer", testName.getMethodName())));
+
+ // Insert some realtime data on sender
+ threads.add(
+ new Thread(
+ () -> {
+ while (!topicSubscribed.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
+ }
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ session.executeNonQueryStatement(
+ "insert into root.db.d1 (time, at1, at2) values (1001, 1,
2), (2001, 3, 4)");
+ session.executeNonQueryStatement(
+ "insert into root.db1.d1 (time, at1, at2) values (1001, 1,
2), (2001, 3, 4)");
+ session.executeNonQueryStatement(
+ "insert into root.db.d1 (time, at1, at2) values (3001, 1,
2), (4001, 3, 4)");
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ dataPrepared.set(true);
+ },
+ String.format("%s - data inserter", testName.getMethodName())));
+
+ for (final Thread thread : threads) {
+ thread.start();
+ }
+
+ try (final Connection connection = receiverEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failures
+ AWAIT.untilAsserted(
+ () ->
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(
+ statement,
+ "select count(at1) from root.db.d1 where time >= 1500
and time <= 2500"),
+ new HashMap<String, String>() {
+ {
+ put("count(root.db.d1.at1)", "2");
+ }
+ }));
+ }
+
+ result.set(true);
+ for (final Thread thread : threads) {
+ thread.join();
+ }
+ }
+
private void testTopicInvalidRuntimeConfigTemplate(
final String topicName, final Properties config) throws Exception {
// Create topic
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index c3b5f24622c..6dc42ed634a 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -28,7 +28,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
public class TopicConfig extends PipeParameters {
@@ -40,14 +43,6 @@ public class TopicConfig extends PipeParameters {
super(attributes);
}
- private static final Map<String, String> LOOSE_RANGE_TIME_CONFIG =
- new HashMap<String, String>() {
- {
- put("history.loose-range", "time");
- put("realtime.loose-range", "time");
- }
- };
-
private static final Map<String, String> REALTIME_BATCH_MODE_CONFIG =
Collections.singletonMap("realtime.mode", "batch");
private static final Map<String, String> REALTIME_STREAM_MODE_CONFIG =
@@ -58,6 +53,15 @@ public class TopicConfig extends PipeParameters {
private static final Map<String, String> SUBSCRIBE_MODE_CONFIG =
Collections.singletonMap("mode", "subscribe");
+ private static final Set<String> LOOSE_RANGE_KEY_SET =
+ Collections.unmodifiableSet(
+ new HashSet<String>() {
+ {
+ add("history.loose-range");
+ add("realtime.loose-range");
+ }
+ });
+
/////////////////////////////// de/ser ///////////////////////////////
public void serialize(final DataOutputStream stream) throws IOException {
@@ -102,12 +106,6 @@ public class TopicConfig extends PipeParameters {
attributesWithTimeRange.put(TopicConstant.END_TIME_KEY, endTime);
}
- // enable loose range when using tsfile format
- if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(
- attributes.getOrDefault(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_DEFAULT_VALUE))) {
- attributesWithTimeRange.putAll(LOOSE_RANGE_TIME_CONFIG);
- }
-
return attributesWithTimeRange;
}
@@ -125,6 +123,14 @@ public class TopicConfig extends PipeParameters {
: SUBSCRIBE_MODE_CONFIG;
}
+ public Map<String, String> getAttributesWithSourceLooseRange() {
+ final String looseRangeValue =
+ attributes.getOrDefault(
+ TopicConstant.LOOSE_RANGE_KEY,
TopicConstant.LOOSE_RANGE_DEFAULT_VALUE);
+ return LOOSE_RANGE_KEY_SET.stream()
+ .collect(Collectors.toMap(key -> key, key -> looseRangeValue));
+ }
+
public Map<String, String> getAttributesWithProcessorPrefix() {
final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
attributes.forEach(
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
index a6e8ec901bf..a3226c1d640 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
@@ -40,6 +40,12 @@ public class TopicConstant {
public static final String FORMAT_TS_FILE_HANDLER_VALUE = "TsFileHandler";
public static final String FORMAT_DEFAULT_VALUE =
FORMAT_SESSION_DATA_SETS_HANDLER_VALUE;
+ public static final String LOOSE_RANGE_KEY = "loose-range";
+ public static final String LOOSE_RANGE_TIME_VALUE = "time";
+ public static final String LOOSE_RANGE_PATH_VALUE = "path";
+ public static final String LOOSE_RANGE_TIME_AND_PATH_VALUE = "time,path";
+ public static final String LOOSE_RANGE_DEFAULT_VALUE = "";
+
private TopicConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 70666a07182..14c48c85da3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -187,6 +187,8 @@ public class TopicMeta {
extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
// source mode
extractorAttributes.putAll(config.getAttributesWithSourceMode());
+ // loose range
+ extractorAttributes.putAll(config.getAttributesWithSourceLooseRange());
return extractorAttributes;
}