This is an automated email from the ASF dual-hosted git repository.

VGalaxies pushed a commit to branch feature/subscription-column-filter
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 05dd8a01e4691d249baeb1d894636a0aabb61380
Author: Codex <codex@localhost>
AuthorDate: Fri Jun 12 17:30:10 2026 +0000

    Add subscription column filter IT follow-ups
---
 .../ConsensusSubscriptionTableITSupport.java       |  26 +++
 ...ConsensusSubscriptionColumnFilterClusterIT.java | 232 +++++++++++++++++++++
 .../IoTDBConsensusSubscriptionFilterTableIT.java   |  19 +-
 .../IoTDBSubscriptionColumnFilterIT.java           | 136 +++++++++++-
 4 files changed, 411 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java
index ff006b8c4ee..ace3e700dbe 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/ConsensusSubscriptionTableITSupport.java
@@ -291,6 +291,32 @@ final class ConsensusSubscriptionTableITSupport {
     return consumed;
   }
 
+  static ConsumedRecords pollAndCommitUntilContains(
+      final SubscriptionTablePullConsumer consumer,
+      final Set<String> expectedRowKeys,
+      final int maxPollRounds)
+      throws Exception {
+    final ConsumedRecords consumed = new ConsumedRecords();
+
+    for (int round = 0; round < maxPollRounds; round++) {
+      final List<SubscriptionMessage> messages = 
consumer.poll(DEFAULT_POLL_TIMEOUT);
+      if (messages.isEmpty()) {
+        if (consumed.getRowKeys().containsAll(expectedRowKeys)) {
+          break;
+        }
+        continue;
+      }
+
+      consumed.merge(consumeMessages(messages));
+      consumer.commitSync(messages);
+      if (consumed.getRowKeys().containsAll(expectedRowKeys)) {
+        break;
+      }
+    }
+
+    return consumed;
+  }
+
   static ConsumedRecords pollWithInfoAndCommitUntilAtLeast(
       final SubscriptionTablePullConsumer consumer,
       final Set<String> topicNames,
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionColumnFilterClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionColumnFilterClusterIT.java
new file mode 100644
index 00000000000..d2fd5552ee7
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionColumnFilterClusterIT.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.consensus.local.tablemodel;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
+import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
+import 
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumer;
+import 
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
+import org.apache.iotdb.subscription.it.AbstractSubscriptionIT;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableClusterIT.class})
+public class IoTDBConsensusSubscriptionColumnFilterClusterIT extends 
AbstractSubscriptionIT {
+
+  private static final long OWNER_LEASE_DURATION_MS = 60_000L;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setSchemaReplicationFactor(1)
+        .setDataReplicationFactor(2)
+        .setAutoCreateSchemaEnabled(true)
+        .setSubscriptionEnabled(true)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false)
+        .setSubscriptionOwnerLeaseDurationMsMin(1000);
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+    super.tearDown();
+  }
+
+  @Test
+  public void testAlterColumnFilterRebindsAfterOwnerTransferOnThreeDataNodes() 
throws Exception {
+    final ConsensusSubscriptionTableITSupport.TestIdentifiers ids =
+        
ConsensusSubscriptionTableITSupport.newIdentifiers("cluster_owner_rebind");
+    final String database = ids.getDatabase();
+    final String table = "t1";
+    final String schema = "tag1 STRING TAG, s1 INT64 FIELD, s2 DOUBLE FIELD, 
s3 BOOLEAN FIELD";
+    SubscriptionTablePullConsumer ownerConsumer = null;
+
+    try {
+      ConsensusSubscriptionTableITSupport.createDatabaseAndTable(database, 
table, schema);
+      ConsensusSubscriptionTableITSupport.insertRows(database, table, 0L, 1, 
true, true, true);
+      createOwnedConsensusTopic(ids.getTopic(), database, table, "column_name 
= \"s1\"");
+
+      ownerConsumer = createOwnerConsumer(ids.consumer("owner1"), 
ids.consumerGroup("owner"), 1L);
+      ownerConsumer.subscribe(ids.getTopic());
+
+      final Set<String> rowsBeforeTransfer =
+          ConsensusSubscriptionTableITSupport.insertRows(
+              database, table, 100L, 3, true, true, true);
+      final ConsensusSubscriptionTableITSupport.ConsumedRecords beforeTransfer 
=
+          ConsensusSubscriptionTableITSupport.pollAndCommitUntilAtLeast(
+              ownerConsumer, rowsBeforeTransfer.size(), 60);
+
+      
ConsensusSubscriptionTableITSupport.assertExactRowKeys(rowsBeforeTransfer, 
beforeTransfer);
+      Assert.assertEquals(
+          Collections.singleton(
+              
ConsensusSubscriptionTableITSupport.normalizeColumnSignature("tag1", "s1")),
+          beforeTransfer.getSeenColumnSignatures());
+
+      ownerConsumer.unsubscribe(ids.getTopic());
+      ownerConsumer.close();
+      ownerConsumer = null;
+
+      addColumn(database, table, "s4 INT32 FIELD");
+      transferOwner(ids.getTopic(), "owner2", 2L);
+      ConsensusSubscriptionTableITSupport.alterConsensusTopicColumnFilter(
+          ids.getTopic(), "category = \"FIELD\"");
+
+      ownerConsumer = createOwnerConsumer(ids.consumer("owner2"), 
ids.consumerGroup("owner"), 2L);
+      ownerConsumer.subscribe(ids.getTopic());
+
+      final Set<String> rowsAfterTransfer = insertRowsWithS4(database, table, 
200L, 3);
+      final ConsensusSubscriptionTableITSupport.ConsumedRecords afterTransfer =
+          ConsensusSubscriptionTableITSupport.pollAndCommitUntilContains(
+              ownerConsumer, rowsAfterTransfer, 80);
+
+      Assert.assertTrue(
+          "Missing post-transfer rows. Consumed records: " + afterTransfer,
+          afterTransfer.getRowKeys().containsAll(rowsAfterTransfer));
+      Assert.assertTrue(
+          afterTransfer
+              .getSeenColumnSignatures()
+              .contains(
+                  ConsensusSubscriptionTableITSupport.normalizeColumnSignature(
+                      "tag1", "s1", "s2", "s3", "s4")));
+    } finally {
+      ConsensusSubscriptionTableITSupport.cleanup(ownerConsumer, 
ids.getTopic(), database);
+    }
+  }
+
+  private static void createOwnedConsensusTopic(
+      final String topicName, final String database, final String table, final 
String columnFilter)
+      throws Exception {
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder()
+            .host(EnvFactory.getEnv().getIP())
+            .port(Integer.parseInt(EnvFactory.getEnv().getPort()))
+            .build()) {
+      session.open();
+      session.dropTopicIfExists(topicName);
+
+      final Properties config = new Properties();
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_CONSENSUS_VALUE);
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_SESSION_DATA_SETS_HANDLER_VALUE);
+      config.put(TopicConstant.DATABASE_KEY, database);
+      config.put(TopicConstant.TABLE_KEY, table);
+      config.put(TopicConstant.COLUMN_FILTER_KEY, columnFilter);
+      config.put(TopicConstant.OWNER_ID_KEY, "owner1");
+      config.put(TopicConstant.OWNER_EPOCH_KEY, "1");
+      config.put(
+          TopicConstant.OWNER_LEASE_DURATION_MS_KEY, 
String.valueOf(OWNER_LEASE_DURATION_MS));
+      session.createTopic(topicName, config);
+    }
+  }
+
+  private static SubscriptionTablePullConsumer createOwnerConsumer(
+      final String consumerId, final String consumerGroupId, final long 
ownerEpoch)
+      throws Exception {
+    final SubscriptionTablePullConsumer consumer =
+        (SubscriptionTablePullConsumer)
+            new SubscriptionTablePullConsumerBuilder()
+                .host(EnvFactory.getEnv().getIP())
+                .port(Integer.parseInt(EnvFactory.getEnv().getPort()))
+                .consumerId(consumerId)
+                .consumerGroupId(consumerGroupId)
+                .ownerId(ownerEpoch == 1L ? "owner1" : "owner2")
+                .ownerEpoch(ownerEpoch)
+                .heartbeatIntervalMs(1000)
+                .endpointsSyncIntervalMs(5000)
+                .autoCommit(false)
+                .build();
+    consumer.open();
+    return consumer;
+  }
+
+  private static void transferOwner(
+      final String topicName, final String ownerId, final long ownerEpoch) 
throws Exception {
+    try (final ISubscriptionTableSession session =
+        new SubscriptionTableSessionBuilder()
+            .host(EnvFactory.getEnv().getIP())
+            .port(Integer.parseInt(EnvFactory.getEnv().getPort()))
+            .build()) {
+      session.open();
+      session.alterTopicOwner(topicName, ownerId, ownerEpoch, 
OWNER_LEASE_DURATION_MS);
+    }
+  }
+
+  private static void addColumn(final String database, final String table, 
final String column)
+      throws Exception {
+    try (final ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("use " + database);
+      session.executeNonQueryStatement("alter table " + table + " add column " 
+ column);
+    }
+  }
+
+  private static Set<String> insertRowsWithS4(
+      final String database, final String table, final long startTimestamp, 
final int rowCount)
+      throws Exception {
+    final Set<String> rowKeys = new LinkedHashSet<>();
+    try (final ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("use " + database);
+      for (int row = 0; row < rowCount; row++) {
+        final long timestamp = startTimestamp + row;
+        session.executeNonQueryStatement(
+            String.format(
+                Locale.ROOT,
+                "insert into %s(tag1, s1, s2, s3, s4, time) "
+                    + "values ('tag_%d', %d, %.1f, %s, %d, %d)",
+                table,
+                timestamp,
+                timestamp * 10L,
+                timestamp + 0.5d,
+                timestamp % 2 == 0 ? "true" : "false",
+                timestamp,
+                timestamp));
+        rowKeys.add(ConsensusSubscriptionTableITSupport.rowKey(database, 
table, timestamp));
+      }
+      session.executeNonQueryStatement("flush");
+    }
+    return rowKeys;
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java
index 7a9e26ab513..14a1a7cb015 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/consensus/local/tablemodel/IoTDBConsensusSubscriptionFilterTableIT.java
@@ -221,6 +221,8 @@ public class IoTDBConsensusSubscriptionFilterTableIT 
extends AbstractSubscriptio
     final String database = ids.getDatabase();
     final String table = "t1";
     final String schema = "tag1 STRING TAG, s1 INT64 FIELD, s2 DOUBLE FIELD";
+    final String expectedColumnSignature =
+        ConsensusSubscriptionTableITSupport.normalizeColumnSignature("tag1", 
"s2");
     SubscriptionTablePullConsumer consumer = null;
 
     try {
@@ -263,6 +265,21 @@ public class IoTDBConsensusSubscriptionFilterTableIT 
extends AbstractSubscriptio
       Assert.assertTrue(consumed.getRowKeys().isEmpty());
       Assert.assertTrue(consumed.getSeenColumns().isEmpty());
       Assert.assertTrue(consumed.getSeenColumnSignatures().isEmpty());
+
+      ConsensusSubscriptionTableITSupport.alterConsensusTopicColumnFilter(
+          ids.getTopic(), "column_name = \"s2\"");
+      final Set<String> rowsAfterEmptyWindow =
+          ConsensusSubscriptionTableITSupport.insertRows(
+              database, table, 300L, 3, true, false, true);
+      final ConsensusSubscriptionTableITSupport.ConsumedRecords 
consumedAfterEmptyWindow =
+          ConsensusSubscriptionTableITSupport.pollAndCommitUntilAtLeast(
+              consumer, rowsAfterEmptyWindow.size(), 50);
+
+      ConsensusSubscriptionTableITSupport.assertExactRowKeys(
+          rowsAfterEmptyWindow, consumedAfterEmptyWindow);
+      Assert.assertEquals(
+          Collections.singleton(expectedColumnSignature),
+          consumedAfterEmptyWindow.getSeenColumnSignatures());
       ConsensusSubscriptionTableITSupport.assertNoMoreMessages(consumer, 3, 
Duration.ofMillis(500));
     } finally {
       ConsensusSubscriptionTableITSupport.cleanup(consumer, ids.getTopic(), 
database);
@@ -279,7 +296,7 @@ public class IoTDBConsensusSubscriptionFilterTableIT 
extends AbstractSubscriptio
     final String expectedColumnSignature =
         ConsensusSubscriptionTableITSupport.normalizeColumnSignature("tag1", 
"s2");
     final String columnFilter =
-        "NOT (datatype IS NULL)"
+        "datatype IS NOT NULL"
             + " AND column_name != \"s1\""
             + " AND column_name NOT IN (\"s1\", \"s3\")"
             + " AND column_name NOT LIKE \"unknown%\""
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionColumnFilterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionColumnFilterIT.java
index 66b83e28dde..9d72aa41756 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionColumnFilterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/tablemodel/IoTDBSubscriptionColumnFilterIT.java
@@ -19,10 +19,19 @@
 
 package org.apache.iotdb.subscription.it.dual.tablemodel;
 
+import org.apache.iotdb.commons.schema.table.TreeViewSchema;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.subscription.columnfilter.BoundColumnFilter;
+import org.apache.iotdb.db.subscription.columnfilter.ColumnFilterBinder;
+import org.apache.iotdb.db.subscription.columnfilter.ColumnFilterMatcher;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionTableArchVerification;
 import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
 import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
@@ -35,6 +44,7 @@ import 
org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import org.apache.iotdb.subscription.it.dual.AbstractSubscriptionDualIT;
 
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.RowRecord;
 import org.apache.tsfile.read.query.dataset.ResultSet;
 import org.junit.Assert;
@@ -68,7 +78,7 @@ public class IoTDBSubscriptionColumnFilterIT extends 
AbstractSubscriptionDualIT
       "( CATEGORY = \"field\" AND datatype IN (\"int64\", \"double\") )"
           + " OR \"column_name\" REGEXP \"tag.*\"";
   private static final String COMPLEX_OPERATOR_FILTER =
-      "NOT (datatype IS NULL)"
+      "datatype IS NOT NULL"
           + " AND column_name != \"s1\""
           + " AND column_name NOT IN (\"s1\", \"s3\")"
           + " AND column_name NOT LIKE \"unknown%\""
@@ -389,6 +399,29 @@ public class IoTDBSubscriptionColumnFilterIT extends 
AbstractSubscriptionDualIT
     }
   }
 
+  @Test
+  public void testTreeViewColumnFilterBindingUsesSourceFieldName() throws 
Exception {
+    final String database = databaseName("view");
+    final String viewName = TABLE_NAME + "_view";
+    final Map<String, String> attributes = new LinkedHashMap<>();
+    attributes.put("__system.sql-dialect", "table");
+    attributes.put(TopicConstant.DATABASE_KEY, database);
+    attributes.put(TopicConstant.TABLE_KEY, viewName);
+    attributes.put(TopicConstant.COLUMN_FILTER_KEY, "column_name = 
\"s_view\"");
+
+    final BoundColumnFilter boundColumnFilter =
+        new ColumnFilterBinder()
+            .bind(
+                new TopicConfig(attributes),
+                Map.of(database, Map.of(viewName, 
createTreeViewSchema(viewName))));
+    final ColumnFilterMatcher matcher =
+        ColumnFilterMatcher.fromBoundColumnFilter(boundColumnFilter);
+
+    Assert.assertTrue(matcher.match(database, viewName, "tag1"));
+    Assert.assertTrue(matcher.match(database, viewName, "s_src"));
+    Assert.assertFalse(matcher.match(database, viewName, "s_view"));
+  }
+
   @Test
   public void testLiveTsFileColumnFilter() throws Exception {
     final String database = databaseName("tsfile");
@@ -584,6 +617,46 @@ public class IoTDBSubscriptionColumnFilterIT extends 
AbstractSubscriptionDualIT
     }
   }
 
+  @Test
+  public void testLiveRecordHandlerRestartResnapshotsNewMatchingTable() throws 
Exception {
+    final String database = databaseName("restart_new");
+    final String topicName = topicName("restart_new");
+    final String consumerId = consumerName("restart_new");
+    final String consumerGroupId = consumerGroupName("restart_new");
+    final String initialTableName = TABLE_NAME + "_restart_old";
+    final String newTableName = TABLE_NAME + "_restart_new";
+
+    try {
+      createDatabaseAndTable(senderEnv, database, initialTableName, 
TABLE_SCHEMA);
+      createTopic(
+          topicName,
+          database,
+          TABLE_NAME + "_restart_.*",
+          TopicConstant.FORMAT_RECORD_HANDLER_VALUE,
+          FIELD_CATEGORY_FILTER);
+      createTable(senderEnv, database, newTableName, TABLE_SCHEMA);
+
+      TestUtils.restartCluster(senderEnv);
+
+      try (final ISubscriptionTablePullConsumer consumer =
+          createConsumer(consumerId, consumerGroupId)) {
+        consumer.subscribe(topicName);
+        insertRows(senderEnv, database, newTableName, 240, 243);
+
+        final Set<Long> expectedTimestamps = new 
LinkedHashSet<>(Arrays.asList(240L, 241L, 242L));
+        final ConsumedRecordStats stats =
+            pollRecordMessagesForTimestamps(consumer, expectedTimestamps, 
false);
+
+        Assert.assertEquals(expectedTimestamps, stats.timestamps);
+        Assert.assertEquals(
+            new LinkedHashSet<>(Arrays.asList("tag1", "s1", "s2", "s3")), 
stats.columnNames);
+        Assert.assertEquals(new LinkedHashSet<>(Arrays.asList(newTableName)), 
stats.tableNames);
+      }
+    } finally {
+      cleanup(topicName, database);
+    }
+  }
+
   @Test
   public void testLiveRecordHandlerStrictSnapshotRequiresAlterAfterAddColumn() 
throws Exception {
     final String database = databaseName("strict");
@@ -635,6 +708,52 @@ public class IoTDBSubscriptionColumnFilterIT extends 
AbstractSubscriptionDualIT
     }
   }
 
+  @Test
+  public void testLiveRecordHandlerExpressionCoverageAndTagRetention() throws 
Exception {
+    final String database = databaseName("expr");
+    final String topicName = topicName("expr");
+    final String consumerId = consumerName("expr");
+    final String consumerGroupId = consumerGroupName("expr");
+    final String expressionFilter =
+        "database IS NOT NULL"
+            + " AND table_name IS NOT NULL"
+            + " AND (database IS NULL OR datatype IN (\"DOUBLE\", \"FLOAT\", 
\"INT64\"))"
+            + " AND column_name NOT IN (\"s1\", \"s3\")"
+            + " AND column_name NOT LIKE \"unknown%\""
+            + " AND column_name NOT REGEXP \"unknown.*\"";
+
+    try {
+      createDatabaseAndTable(senderEnv, database, TABLE_NAME, TABLE_SCHEMA);
+      createTopic(
+          topicName,
+          database,
+          TABLE_NAME,
+          TopicConstant.FORMAT_RECORD_HANDLER_VALUE,
+          expressionFilter);
+
+      try (final ISubscriptionTablePullConsumer consumer =
+          createConsumer(consumerId, consumerGroupId)) {
+        consumer.subscribe(topicName);
+        insertRows(senderEnv, database, TABLE_NAME, 250, 253);
+
+        final Set<Long> expectedTimestamps = new 
LinkedHashSet<>(Arrays.asList(250L, 251L, 252L));
+        final ConsumedRecordStats stats =
+            pollRecordMessagesForTimestamps(consumer, expectedTimestamps, 
false);
+
+        Assert.assertEquals(expectedTimestamps, stats.timestamps);
+        Assert.assertEquals(new LinkedHashSet<>(Arrays.asList("tag1", "s2")), 
stats.columnNames);
+        Assert.assertTrue(
+            "TAG column must be retained when only FIELD matches",
+            stats.columnNames.contains("tag1"));
+        Assert.assertFalse(stats.columnNames.contains("time"));
+        Assert.assertFalse(stats.columnNames.contains("s1"));
+        Assert.assertFalse(stats.columnNames.contains("s3"));
+      }
+    } finally {
+      cleanup(topicName, database);
+    }
+  }
+
   @Test
   public void testLiveRecordHandlerColumnFilterMatchesCustomTimeColumn() 
throws Exception {
     final String database = databaseName("custom_time");
@@ -840,6 +959,17 @@ public class IoTDBSubscriptionColumnFilterIT extends 
AbstractSubscriptionDualIT
     }
   }
 
+  private static TsTable createTreeViewSchema(final String viewName) {
+    final TsTable table = new TsTable(viewName);
+    table.addProp(TreeViewSchema.TREE_PATH_PATTERN, "root.view.**");
+    table.addColumnSchema(new TimeColumnSchema("time", TSDataType.TIMESTAMP));
+    table.addColumnSchema(new TagColumnSchema("tag1", TSDataType.STRING));
+    final FieldColumnSchema sourceField = new FieldColumnSchema("s_view", 
TSDataType.DOUBLE);
+    TreeViewSchema.setOriginalName(sourceField, "s_src");
+    table.addColumnSchema(sourceField);
+    return table;
+  }
+
   private void createTopic(
       final String topicName,
       final String database,
@@ -1182,6 +1312,7 @@ public class IoTDBSubscriptionColumnFilterIT extends 
AbstractSubscriptionDualIT
       for (final ResultSet resultSet : message.getResultSets()) {
         final SubscriptionRecordHandler.SubscriptionResultSet 
subscriptionResultSet =
             (SubscriptionRecordHandler.SubscriptionResultSet) resultSet;
+        stats.tableNames.add(subscriptionResultSet.getTableName());
         subscriptionResultSet
             .getColumnNames()
             .forEach(columnName -> 
stats.columnNames.add(columnName.toLowerCase(Locale.ROOT)));
@@ -1312,6 +1443,7 @@ public class IoTDBSubscriptionColumnFilterIT extends 
AbstractSubscriptionDualIT
 
   private static final class ConsumedRecordStats {
     private final Set<String> columnNames = new LinkedHashSet<>();
+    private final Set<String> tableNames = new LinkedHashSet<>();
     private final Set<Long> timestamps = new LinkedHashSet<>();
     private final Map<Long, Boolean> timeSelectedByTimestamp = new 
LinkedHashMap<>();
     private int rowCount;
@@ -1321,6 +1453,8 @@ public class IoTDBSubscriptionColumnFilterIT extends 
AbstractSubscriptionDualIT
       return "ConsumedRecordStats{"
           + "columnNames="
           + columnNames
+          + ", tableNames="
+          + tableNames
           + ", timestamps="
           + timestamps
           + ", timeSelectedByTimestamp="

Reply via email to