This is an automated email from the ASF dual-hosted git repository.
VGalaxies pushed a commit to branch feature/subscription-column-filter
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 05dd8a01e4691d249baeb1d894636a0aabb61380
Author: Codex <codex@localhost>
AuthorDate: Fri Jun 12 17:30:10 2026 +0000
Add subscription column filter IT follow-ups
---
.../ConsensusSubscriptionTableITSupport.java | 26 +++
...ConsensusSubscriptionColumnFilterClusterIT.java | 232 +++++++++++++++++++++
.../IoTDBConsensusSubscriptionFilterTableIT.java | 19 +-
.../IoTDBSubscriptionColumnFilterIT.java | 136 +++++++++++-
4 files changed, 411 insertions(+), 2 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java
index ff006b8c4ee..ace3e700dbe 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java
@@ -291,6 +291,32 @@ final class ConsensusSubscriptionTableITSupport {
return consumed;
}
+ static ConsumedRecords pollAndCommitUntilContains(
+ final SubscriptionTablePullConsumer consumer,
+ final Set<String> expectedRowKeys,
+ final int maxPollRounds)
+ throws Exception {
+ final ConsumedRecords consumed = new ConsumedRecords();
+
+ for (int round = 0; round < maxPollRounds; round++) {
+ final List<SubscriptionMessage> messages =
consumer.poll(DEFAULT_POLL_TIMEOUT);
+ if (messages.isEmpty()) {
+ if (consumed.getRowKeys().containsAll(expectedRowKeys)) {
+ break;
+ }
+ continue;
+ }
+
+ consumed.merge(consumeMessages(messages));
+ consumer.commitSync(messages);
+ if (consumed.getRowKeys().containsAll(expectedRowKeys)) {
+ break;
+ }
+ }
+
+ return consumed;
+ }
+
static ConsumedRecords pollWithInfoAndCommitUntilAtLeast(
final SubscriptionTablePullConsumer consumer,
final Set<String> topicNames,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionColumnFilterClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionColumnFilterClusterIT.java
new file mode 100644
index 00000000000..d2fd5552ee7
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionColumnFilterClusterIT.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.consensus.local.tablemodel;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
+import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
+import
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumer;
+import
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableClusterIT.class})
+public class IoTDBConsensusSubscriptionColumnFilterClusterIT extends
AbstractSubscriptionIT {
+
+ private static final long OWNER_LEASE_DURATION_MS = 60_000L;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(1)
+ .setDataReplicationFactor(2)
+ .setAutoCreateSchemaEnabled(true)
+ .setSubscriptionEnabled(true)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false)
+ .setSubscriptionOwnerLeaseDurationMsMin(1000);
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ super.tearDown();
+ }
+
+ @Test
+ public void testAlterColumnFilterRebindsAfterOwnerTransferOnThreeDataNodes()
throws Exception {
+ final ConsensusSubscriptionTableITSupport.TestIdentifiers ids =
+
ConsensusSubscriptionTableITSupport.newIdentifiers("cluster_owner_rebind");
+ final String database = ids.getDatabase();
+ final String table = "t1";
+ final String schema = "tag1 STRING TAG, s1 INT64 FIELD, s2 DOUBLE FIELD,
s3 BOOLEAN FIELD";
+ SubscriptionTablePullConsumer ownerConsumer = null;
+
+ try {
+ ConsensusSubscriptionTableITSupport.createDatabaseAndTable(database,
table, schema);
+ ConsensusSubscriptionTableITSupport.insertRows(database, table, 0L, 1,
true, true, true);
+ createOwnedConsensusTopic(ids.getTopic(), database, table, "column_name
= \"s1\"");
+
+ ownerConsumer = createOwnerConsumer(ids.consumer("owner1"),
ids.consumerGroup("owner"), 1L);
+ ownerConsumer.subscribe(ids.getTopic());
+
+ final Set<String> rowsBeforeTransfer =
+ ConsensusSubscriptionTableITSupport.insertRows(
+ database, table, 100L, 3, true, true, true);
+ final ConsensusSubscriptionTableITSupport.ConsumedRecords beforeTransfer
=
+ ConsensusSubscriptionTableITSupport.pollAndCommitUntilAtLeast(
+ ownerConsumer, rowsBeforeTransfer.size(), 60);
+
+
ConsensusSubscriptionTableITSupport.assertExactRowKeys(rowsBeforeTransfer,
beforeTransfer);
+ Assert.assertEquals(
+ Collections.singleton(
+
ConsensusSubscriptionTableITSupport.normalizeColumnSignature("tag1", "s1")),
+ beforeTransfer.getSeenColumnSignatures());
+
+ ownerConsumer.unsubscribe(ids.getTopic());
+ ownerConsumer.close();
+ ownerConsumer = null;
+
+ addColumn(database, table, "s4 INT32 FIELD");
+ transferOwner(ids.getTopic(), "owner2", 2L);
+ ConsensusSubscriptionTableITSupport.alterConsensusTopicColumnFilter(
+ ids.getTopic(), "category = \"FIELD\"");
+
+ ownerConsumer = createOwnerConsumer(ids.consumer("owner2"),
ids.consumerGroup("owner"), 2L);
+ ownerConsumer.subscribe(ids.getTopic());
+
+ final Set<String> rowsAfterTransfer = insertRowsWithS4(database, table,
200L, 3);
+ final ConsensusSubscriptionTableITSupport.ConsumedRecords afterTransfer =
+ ConsensusSubscriptionTableITSupport.pollAndCommitUntilContains(
+ ownerConsumer, rowsAfterTransfer, 80);
+
+ Assert.assertTrue(
+ "Missing post-transfer rows. Consumed records: " + afterTransfer,
+ afterTransfer.getRowKeys().containsAll(rowsAfterTransfer));
+ Assert.assertTrue(
+ afterTransfer
+ .getSeenColumnSignatures()
+ .contains(
+ ConsensusSubscriptionTableITSupport.normalizeColumnSignature(
+ "tag1", "s1", "s2", "s3", "s4")));
+ } finally {
+ ConsensusSubscriptionTableITSupport.cleanup(ownerConsumer,
ids.getTopic(), database);
+ }
+ }
+
+ private static void createOwnedConsensusTopic(
+ final String topicName, final String database, final String table, final
String columnFilter)
+ throws Exception {
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder()
+ .host(EnvFactory.getEnv().getIP())
+ .port(Integer.parseInt(EnvFactory.getEnv().getPort()))
+ .build()) {
+ session.open();
+ session.dropTopicIfExists(topicName);
+
+ final Properties config = new Properties();
+ config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_CONSENSUS_VALUE);
+ config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+ config.put(TopicConstant.DATABASE_KEY, database);
+ config.put(TopicConstant.TABLE_KEY, table);
+ config.put(TopicConstant.COLUMN_FILTER_KEY, columnFilter);
+ config.put(TopicConstant.OWNER_ID_KEY, "owner1");
+ config.put(TopicConstant.OWNER_EPOCH_KEY, "1");
+ config.put(
+ TopicConstant.OWNER_LEASE_DURATION_MS_KEY,
String.valueOf(OWNER_LEASE_DURATION_MS));
+ session.createTopic(topicName, config);
+ }
+ }
+
+ private static SubscriptionTablePullConsumer createOwnerConsumer(
+ final String consumerId, final String consumerGroupId, final long
ownerEpoch)
+ throws Exception {
+ final SubscriptionTablePullConsumer consumer =
+ (SubscriptionTablePullConsumer)
+ new SubscriptionTablePullConsumerBuilder()
+ .host(EnvFactory.getEnv().getIP())
+ .port(Integer.parseInt(EnvFactory.getEnv().getPort()))
+ .consumerId(consumerId)
+ .consumerGroupId(consumerGroupId)
+ .ownerId(ownerEpoch == 1L ? "owner1" : "owner2")
+ .ownerEpoch(ownerEpoch)
+ .heartbeatIntervalMs(1000)
+ .endpointsSyncIntervalMs(5000)
+ .autoCommit(false)
+ .build();
+ consumer.open();
+ return consumer;
+ }
+
+ private static void transferOwner(
+ final String topicName, final String ownerId, final long ownerEpoch)
throws Exception {
+ try (final ISubscriptionTableSession session =
+ new SubscriptionTableSessionBuilder()
+ .host(EnvFactory.getEnv().getIP())
+ .port(Integer.parseInt(EnvFactory.getEnv().getPort()))
+ .build()) {
+ session.open();
+ session.alterTopicOwner(topicName, ownerId, ownerEpoch,
OWNER_LEASE_DURATION_MS);
+ }
+ }
+
+ private static void addColumn(final String database, final String table,
final String column)
+ throws Exception {
+ try (final ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("use " + database);
+ session.executeNonQueryStatement("alter table " + table + " add column "
+ column);
+ }
+ }
+
+ private static Set<String> insertRowsWithS4(
+ final String database, final String table, final long startTimestamp,
final int rowCount)
+ throws Exception {
+ final Set<String> rowKeys = new LinkedHashSet<>();
+ try (final ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("use " + database);
+ for (int row = 0; row < rowCount; row++) {
+ final long timestamp = startTimestamp + row;
+ session.executeNonQueryStatement(
+ String.format(
+ Locale.ROOT,
+ "insert into %s(tag1, s1, s2, s3, s4, time) "
+ + "values ('tag_%d', %d, %.1f, %s, %d, %d)",
+ table,
+ timestamp,
+ timestamp * 10L,
+ timestamp + 0.5d,
+ timestamp % 2 == 0 ? "true" : "false",
+ timestamp,
+ timestamp));
+ rowKeys.add(ConsensusSubscriptionTableITSupport.rowKey(database,
table, timestamp));
+ }
+ session.executeNonQueryStatement("flush");
+ }
+ return rowKeys;
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java
index 7a9e26ab513..14a1a7cb015 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java
@@ -221,6 +221,8 @@ public class IoTDBConsensusSubscriptionFilterTableIT
extends AbstractSubscriptio
final String database = ids.getDatabase();
final String table = "t1";
final String schema = "tag1 STRING TAG, s1 INT64 FIELD, s2 DOUBLE FIELD";
+ final String expectedColumnSignature =
+ ConsensusSubscriptionTableITSupport.normalizeColumnSignature("tag1",
"s2");
SubscriptionTablePullConsumer consumer = null;
try {
@@ -263,6 +265,21 @@ public class IoTDBConsensusSubscriptionFilterTableIT
extends AbstractSubscriptio
Assert.assertTrue(consumed.getRowKeys().isEmpty());
Assert.assertTrue(consumed.getSeenColumns().isEmpty());
Assert.assertTrue(consumed.getSeenColumnSignatures().isEmpty());
+
+ ConsensusSubscriptionTableITSupport.alterConsensusTopicColumnFilter(
+ ids.getTopic(), "column_name = \"s2\"");
+ final Set<String> rowsAfterEmptyWindow =
+ ConsensusSubscriptionTableITSupport.insertRows(
+ database, table, 300L, 3, true, false, true);
+ final ConsensusSubscriptionTableITSupport.ConsumedRecords
consumedAfterEmptyWindow =
+ ConsensusSubscriptionTableITSupport.pollAndCommitUntilAtLeast(
+ consumer, rowsAfterEmptyWindow.size(), 50);
+
+ ConsensusSubscriptionTableITSupport.assertExactRowKeys(
+ rowsAfterEmptyWindow, consumedAfterEmptyWindow);
+ Assert.assertEquals(
+ Collections.singleton(expectedColumnSignature),
+ consumedAfterEmptyWindow.getSeenColumnSignatures());
ConsensusSubscriptionTableITSupport.assertNoMoreMessages(consumer, 3,
Duration.ofMillis(500));
} finally {
ConsensusSubscriptionTableITSupport.cleanup(consumer, ids.getTopic(),
database);
@@ -279,7 +296,7 @@ public class IoTDBConsensusSubscriptionFilterTableIT
extends AbstractSubscriptio
final String expectedColumnSignature =
ConsensusSubscriptionTableITSupport.normalizeColumnSignature("tag1",
"s2");
final String columnFilter =
- "NOT (datatype IS NULL)"
+ "datatype IS NOT NULL"
+ " AND column_name != \"s1\""
+ " AND column_name NOT IN (\"s1\", \"s3\")"
+ " AND column_name NOT LIKE \"unknown%\""
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionColumnFilterIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionColumnFilterIT.java
index 66b83e28dde..9d72aa41756 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionColumnFilterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionColumnFilterIT.java
@@ -19,10 +19,19 @@
package org.apache.iotdb.subscription.it.dual.tablemodel;
+import org.apache.iotdb.commons.schema.table.TreeViewSchema;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.subscription.columnfilter.BoundColumnFilter;
+import org.apache.iotdb.db.subscription.columnfilter.ColumnFilterBinder;
+import org.apache.iotdb.db.subscription.columnfilter.ColumnFilterMatcher;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTableArchVerification;
import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
@@ -35,6 +44,7 @@ import
org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
import org.apache.iotdb.subscription.it.dual.AbstractSubscriptionDualIT;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.query.dataset.ResultSet;
import org.junit.Assert;
@@ -68,7 +78,7 @@ public class IoTDBSubscriptionColumnFilterIT extends
AbstractSubscriptionDualIT
"( CATEGORY = \"field\" AND datatype IN (\"int64\", \"double\") )"
+ " OR \"column_name\" REGEXP \"tag.*\"";
private static final String COMPLEX_OPERATOR_FILTER =
- "NOT (datatype IS NULL)"
+ "datatype IS NOT NULL"
+ " AND column_name != \"s1\""
+ " AND column_name NOT IN (\"s1\", \"s3\")"
+ " AND column_name NOT LIKE \"unknown%\""
@@ -389,6 +399,29 @@ public class IoTDBSubscriptionColumnFilterIT extends
AbstractSubscriptionDualIT
}
}
+ @Test
+ public void testTreeViewColumnFilterBindingUsesSourceFieldName() throws
Exception {
+ final String database = databaseName("view");
+ final String viewName = TABLE_NAME + "_view";
+ final Map<String, String> attributes = new LinkedHashMap<>();
+ attributes.put("__system.sql-dialect", "table");
+ attributes.put(TopicConstant.DATABASE_KEY, database);
+ attributes.put(TopicConstant.TABLE_KEY, viewName);
+ attributes.put(TopicConstant.COLUMN_FILTER_KEY, "column_name =
\"s_view\"");
+
+ final BoundColumnFilter boundColumnFilter =
+ new ColumnFilterBinder()
+ .bind(
+ new TopicConfig(attributes),
+ Map.of(database, Map.of(viewName,
createTreeViewSchema(viewName))));
+ final ColumnFilterMatcher matcher =
+ ColumnFilterMatcher.fromBoundColumnFilter(boundColumnFilter);
+
+ Assert.assertTrue(matcher.match(database, viewName, "tag1"));
+ Assert.assertTrue(matcher.match(database, viewName, "s_src"));
+ Assert.assertFalse(matcher.match(database, viewName, "s_view"));
+ }
+
@Test
public void testLiveTsFileColumnFilter() throws Exception {
final String database = databaseName("tsfile");
@@ -584,6 +617,46 @@ public class IoTDBSubscriptionColumnFilterIT extends
AbstractSubscriptionDualIT
}
}
+ @Test
+ public void testLiveRecordHandlerRestartResnapshotsNewMatchingTable() throws
Exception {
+ final String database = databaseName("restart_new");
+ final String topicName = topicName("restart_new");
+ final String consumerId = consumerName("restart_new");
+ final String consumerGroupId = consumerGroupName("restart_new");
+ final String initialTableName = TABLE_NAME + "_restart_old";
+ final String newTableName = TABLE_NAME + "_restart_new";
+
+ try {
+ createDatabaseAndTable(senderEnv, database, initialTableName,
TABLE_SCHEMA);
+ createTopic(
+ topicName,
+ database,
+ TABLE_NAME + "_restart_.*",
+ TopicConstant.FORMAT_RECORD_HANDLER_VALUE,
+ FIELD_CATEGORY_FILTER);
+ createTable(senderEnv, database, newTableName, TABLE_SCHEMA);
+
+ TestUtils.restartCluster(senderEnv);
+
+ try (final ISubscriptionTablePullConsumer consumer =
+ createConsumer(consumerId, consumerGroupId)) {
+ consumer.subscribe(topicName);
+ insertRows(senderEnv, database, newTableName, 240, 243);
+
+ final Set<Long> expectedTimestamps = new
LinkedHashSet<>(Arrays.asList(240L, 241L, 242L));
+ final ConsumedRecordStats stats =
+ pollRecordMessagesForTimestamps(consumer, expectedTimestamps,
false);
+
+ Assert.assertEquals(expectedTimestamps, stats.timestamps);
+ Assert.assertEquals(
+ new LinkedHashSet<>(Arrays.asList("tag1", "s1", "s2", "s3")),
stats.columnNames);
+ Assert.assertEquals(new LinkedHashSet<>(Arrays.asList(newTableName)),
stats.tableNames);
+ }
+ } finally {
+ cleanup(topicName, database);
+ }
+ }
+
@Test
public void testLiveRecordHandlerStrictSnapshotRequiresAlterAfterAddColumn()
throws Exception {
final String database = databaseName("strict");
@@ -635,6 +708,52 @@ public class IoTDBSubscriptionColumnFilterIT extends
AbstractSubscriptionDualIT
}
}
+ @Test
+ public void testLiveRecordHandlerExpressionCoverageAndTagRetention() throws
Exception {
+ final String database = databaseName("expr");
+ final String topicName = topicName("expr");
+ final String consumerId = consumerName("expr");
+ final String consumerGroupId = consumerGroupName("expr");
+ final String expressionFilter =
+ "database IS NOT NULL"
+ + " AND table_name IS NOT NULL"
+ + " AND (database IS NULL OR datatype IN (\"DOUBLE\", \"FLOAT\",
\"INT64\"))"
+ + " AND column_name NOT IN (\"s1\", \"s3\")"
+ + " AND column_name NOT LIKE \"unknown%\""
+ + " AND column_name NOT REGEXP \"unknown.*\"";
+
+ try {
+ createDatabaseAndTable(senderEnv, database, TABLE_NAME, TABLE_SCHEMA);
+ createTopic(
+ topicName,
+ database,
+ TABLE_NAME,
+ TopicConstant.FORMAT_RECORD_HANDLER_VALUE,
+ expressionFilter);
+
+ try (final ISubscriptionTablePullConsumer consumer =
+ createConsumer(consumerId, consumerGroupId)) {
+ consumer.subscribe(topicName);
+ insertRows(senderEnv, database, TABLE_NAME, 250, 253);
+
+ final Set<Long> expectedTimestamps = new
LinkedHashSet<>(Arrays.asList(250L, 251L, 252L));
+ final ConsumedRecordStats stats =
+ pollRecordMessagesForTimestamps(consumer, expectedTimestamps,
false);
+
+ Assert.assertEquals(expectedTimestamps, stats.timestamps);
+ Assert.assertEquals(new LinkedHashSet<>(Arrays.asList("tag1", "s2")),
stats.columnNames);
+ Assert.assertTrue(
+ "TAG column must be retained when only FIELD matches",
+ stats.columnNames.contains("tag1"));
+ Assert.assertFalse(stats.columnNames.contains("time"));
+ Assert.assertFalse(stats.columnNames.contains("s1"));
+ Assert.assertFalse(stats.columnNames.contains("s3"));
+ }
+ } finally {
+ cleanup(topicName, database);
+ }
+ }
+
@Test
public void testLiveRecordHandlerColumnFilterMatchesCustomTimeColumn()
throws Exception {
final String database = databaseName("custom_time");
@@ -840,6 +959,17 @@ public class IoTDBSubscriptionColumnFilterIT extends
AbstractSubscriptionDualIT
}
}
+ private static TsTable createTreeViewSchema(final String viewName) {
+ final TsTable table = new TsTable(viewName);
+ table.addProp(TreeViewSchema.TREE_PATH_PATTERN, "root.view.**");
+ table.addColumnSchema(new TimeColumnSchema("time", TSDataType.TIMESTAMP));
+ table.addColumnSchema(new TagColumnSchema("tag1", TSDataType.STRING));
+ final FieldColumnSchema sourceField = new FieldColumnSchema("s_view",
TSDataType.DOUBLE);
+ TreeViewSchema.setOriginalName(sourceField, "s_src");
+ table.addColumnSchema(sourceField);
+ return table;
+ }
+
private void createTopic(
final String topicName,
final String database,
@@ -1182,6 +1312,7 @@ public class IoTDBSubscriptionColumnFilterIT extends
AbstractSubscriptionDualIT
for (final ResultSet resultSet : message.getResultSets()) {
final SubscriptionRecordHandler.SubscriptionResultSet
subscriptionResultSet =
(SubscriptionRecordHandler.SubscriptionResultSet) resultSet;
+ stats.tableNames.add(subscriptionResultSet.getTableName());
subscriptionResultSet
.getColumnNames()
.forEach(columnName ->
stats.columnNames.add(columnName.toLowerCase(Locale.ROOT)));
@@ -1312,6 +1443,7 @@ public class IoTDBSubscriptionColumnFilterIT extends
AbstractSubscriptionDualIT
private static final class ConsumedRecordStats {
private final Set<String> columnNames = new LinkedHashSet<>();
+ private final Set<String> tableNames = new LinkedHashSet<>();
private final Set<Long> timestamps = new LinkedHashSet<>();
private final Map<Long, Boolean> timeSelectedByTimestamp = new
LinkedHashMap<>();
private int rowCount;
@@ -1321,6 +1453,8 @@ public class IoTDBSubscriptionColumnFilterIT extends
AbstractSubscriptionDualIT
return "ConsumedRecordStats{"
+ "columnNames="
+ columnNames
+ + ", tableNames="
+ + tableNames
+ ", timestamps="
+ timestamps
+ ", timeSelectedByTimestamp="