This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new b301184f07a Subscription IT: fix data insertion logic in
SubscriptionConsumerGroupIT (#12863)
b301184f07a is described below
commit b301184f07a5406ef9614d3d7efbe4dcf8a18d7e
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Jul 8 17:23:52 2024 +0800
Subscription IT: fix data insertion logic in SubscriptionConsumerGroupIT
(#12863)
---
.../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 48 +++++++++++++---------
1 file changed, 28 insertions(+), 20 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index e3325749250..fffca035ec5 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -954,7 +954,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
while (dataSet.hasNext()) {
final RowRecord record = dataSet.next();
if
(!insertRowRecordEnrichedByConsumerGroupId(
- columnNameList, record,
consumerGroupId)) {
+ columnNameList, record.getTimestamp(),
consumerGroupId)) {
receiverCrashed.set(true);
throw new RuntimeException("detect
receiver crashed");
}
@@ -975,12 +975,12 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
null));
while (dataSet.hasNext()) {
final RowRecord record = dataSet.next();
- if
(!insertRowRecordEnrichedByConsumerGroupId(
- dataSet.getPaths().get(0).toString(),
- record,
- consumerGroupId)) {
- receiverCrashed.set(true);
- throw new RuntimeException("detect
receiver crashed");
+ for (final Path path : dataSet.getPaths()) {
+ if
(!insertRowRecordEnrichedByConsumerGroupId(
+ path.toString(),
record.getTimestamp(), consumerGroupId)) {
+ receiverCrashed.set(true);
+ throw new RuntimeException("detect
receiver crashed");
+ }
}
}
}
@@ -1050,39 +1050,47 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
* @return false -> receiver crashed
*/
private boolean insertRowRecordEnrichedByConsumerGroupId(
- final List<String> columnNameList, final RowRecord record, final String
consumerGroupId)
+ final List<String> columnNameList, final long timestamp, final String
consumerGroupId)
throws Exception {
- if (columnNameList.size() != 2) {
+ final int columnSize = columnNameList.size();
+ if (columnSize <= 1) { // only with time column
LOGGER.warn("unexpected column name list: {}", columnNameList);
throw new Exception("unexpected column name list");
}
- final String columnName = columnNameList.get(1);
- return insertRowRecordEnrichedByConsumerGroupId(columnName, record,
consumerGroupId);
+
+ for (int columnIndex = 1; columnIndex < columnSize; ++columnIndex) {
+ final String columnName = columnNameList.get(columnIndex);
+ if (!insertRowRecordEnrichedByConsumerGroupId(columnName, timestamp,
consumerGroupId)) {
+ return false;
+ }
+ }
+
+ return true;
}
/**
* @return false -> receiver crashed
*/
private boolean insertRowRecordEnrichedByConsumerGroupId(
- final String columnName, final RowRecord record, final String
consumerGroupId)
+ final String columnName, final long timestamp, final String
consumerGroupId)
throws Exception {
if ("root.topic1.s".equals(columnName)) {
final String sql =
String.format(
- "insert into root.%s.topic1(time, s) values (%s, 1)",
- consumerGroupId, record.getTimestamp());
+ "insert into root.%s.topic1(time, s) values (%s, 1)",
consumerGroupId, timestamp);
LOGGER.info(sql);
return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
- } else if ("root.topic2.s".equals(columnName)) {
+ }
+
+ if ("root.topic2.s".equals(columnName)) {
final String sql =
String.format(
- "insert into root.%s.topic2(time, s) values (%s, 1)",
- consumerGroupId, record.getTimestamp());
+ "insert into root.%s.topic2(time, s) values (%s, 3)",
consumerGroupId, timestamp);
LOGGER.info(sql);
return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
- } else {
- LOGGER.warn("unexpected column name: {}", columnName);
- throw new Exception("unexpected column name");
}
+
+ LOGGER.warn("unexpected column name: {}", columnName);
+ throw new Exception("unexpected column name");
}
}