This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TableModelGrammar by this 
push:
     new b301184f07a Subscription IT: fix data insertion logic in 
SubscriptionConsumerGroupIT (#12863)
b301184f07a is described below

commit b301184f07a5406ef9614d3d7efbe4dcf8a18d7e
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Jul 8 17:23:52 2024 +0800

    Subscription IT: fix data insertion logic in SubscriptionConsumerGroupIT 
(#12863)
---
 .../it/dual/IoTDBSubscriptionConsumerGroupIT.java  | 48 +++++++++++++---------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index e3325749250..fffca035ec5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -954,7 +954,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                                 while (dataSet.hasNext()) {
                                   final RowRecord record = dataSet.next();
                                   if 
(!insertRowRecordEnrichedByConsumerGroupId(
-                                      columnNameList, record, 
consumerGroupId)) {
+                                      columnNameList, record.getTimestamp(), 
consumerGroupId)) {
                                     receiverCrashed.set(true);
                                     throw new RuntimeException("detect 
receiver crashed");
                                   }
@@ -975,12 +975,12 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                                             null));
                                 while (dataSet.hasNext()) {
                                   final RowRecord record = dataSet.next();
-                                  if 
(!insertRowRecordEnrichedByConsumerGroupId(
-                                      dataSet.getPaths().get(0).toString(),
-                                      record,
-                                      consumerGroupId)) {
-                                    receiverCrashed.set(true);
-                                    throw new RuntimeException("detect 
receiver crashed");
+                                  for (final Path path : dataSet.getPaths()) {
+                                    if 
(!insertRowRecordEnrichedByConsumerGroupId(
+                                        path.toString(), 
record.getTimestamp(), consumerGroupId)) {
+                                      receiverCrashed.set(true);
+                                      throw new RuntimeException("detect 
receiver crashed");
+                                    }
                                   }
                                 }
                               }
@@ -1050,39 +1050,47 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
    * @return false -> receiver crashed
    */
   private boolean insertRowRecordEnrichedByConsumerGroupId(
-      final List<String> columnNameList, final RowRecord record, final String 
consumerGroupId)
+      final List<String> columnNameList, final long timestamp, final String 
consumerGroupId)
       throws Exception {
-    if (columnNameList.size() != 2) {
+    final int columnSize = columnNameList.size();
+    if (columnSize <= 1) { // only with time column
       LOGGER.warn("unexpected column name list: {}", columnNameList);
       throw new Exception("unexpected column name list");
     }
-    final String columnName = columnNameList.get(1);
-    return insertRowRecordEnrichedByConsumerGroupId(columnName, record, 
consumerGroupId);
+
+    for (int columnIndex = 1; columnIndex < columnSize; ++columnIndex) {
+      final String columnName = columnNameList.get(columnIndex);
+      if (!insertRowRecordEnrichedByConsumerGroupId(columnName, timestamp, 
consumerGroupId)) {
+        return false;
+      }
+    }
+
+    return true;
   }
 
   /**
    * @return false -> receiver crashed
    */
   private boolean insertRowRecordEnrichedByConsumerGroupId(
-      final String columnName, final RowRecord record, final String 
consumerGroupId)
+      final String columnName, final long timestamp, final String 
consumerGroupId)
       throws Exception {
     if ("root.topic1.s".equals(columnName)) {
       final String sql =
           String.format(
-              "insert into root.%s.topic1(time, s) values (%s, 1)",
-              consumerGroupId, record.getTimestamp());
+              "insert into root.%s.topic1(time, s) values (%s, 1)", 
consumerGroupId, timestamp);
       LOGGER.info(sql);
       return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
-    } else if ("root.topic2.s".equals(columnName)) {
+    }
+
+    if ("root.topic2.s".equals(columnName)) {
       final String sql =
           String.format(
-              "insert into root.%s.topic2(time, s) values (%s, 1)",
-              consumerGroupId, record.getTimestamp());
+              "insert into root.%s.topic2(time, s) values (%s, 3)", 
consumerGroupId, timestamp);
       LOGGER.info(sql);
       return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
-    } else {
-      LOGGER.warn("unexpected column name: {}", columnName);
-      throw new Exception("unexpected column name");
     }
+
+    LOGGER.warn("unexpected column name: {}", columnName);
+    throw new Exception("unexpected column name");
   }
 }

Reply via email to