This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 29fab685489 Pipe IT: Added flush on
IoTDBPipeExtractorIT#testExtractorPatternMatch() to avoid pipe on previous
follower getting unclosed historical tsfile from the old leader (#12534)
29fab685489 is described below
commit 29fab68548970d195a3cb18e0031ba115f216cb0
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 17 13:26:00 2024 +0800
Pipe IT: Added flush on IoTDBPipeExtractorIT#testExtractorPatternMatch() to
avoid pipe on previous follower getting unclosed historical tsfile from the old
leader (#12534)
---
.../pipe/it/autocreate/IoTDBPipeExtractorIT.java | 36 +++++++++++++---------
1 file changed, 22 insertions(+), 14 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index ba2e8021267..541d628aa7e 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -78,7 +78,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p1_1);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -98,7 +98,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p1_2);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -118,7 +118,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p1_3);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -139,7 +139,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p1_4);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -160,7 +160,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p1_5);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -185,7 +185,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p2_1);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -204,7 +204,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p2_2);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -224,7 +224,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p2_3);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -244,7 +244,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p2_4);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -270,7 +270,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(p3_1);
- } catch (SQLException e) {
+ } catch (final SQLException e) {
fail(e.getMessage());
}
@@ -305,7 +305,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
final Statement statement = connection.createStatement()) {
statement.execute(String.format(formatString, invalidStartTime));
fail();
- } catch (SQLException ignored) {
+ } catch (final SQLException ignored) {
}
}
assertPipeCount(0);
@@ -328,7 +328,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
final Statement statement = connection.createStatement()) {
statement.execute(p2);
fail();
- } catch (SQLException ignored) {
+ } catch (final SQLException ignored) {
}
assertPipeCount(0);
@@ -351,7 +351,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
final Statement statement = connection.createStatement()) {
statement.execute(p3);
fail();
- } catch (SQLException ignored) {
+ } catch (final SQLException ignored) {
}
assertPipeCount(0);
@@ -372,7 +372,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
final Statement statement = connection.createStatement()) {
statement.execute(p4);
fail();
- } catch (SQLException ignored) {
+ } catch (final SQLException ignored) {
}
assertPipeCount(0);
}
@@ -447,6 +447,14 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p"
+ i).getCode());
+ // We add flush here because the pipe may be created on the new IoT
leader
+ // and the old leader's data may come as an unclosed historical tsfile
+ // and is skipped flush when the pipe starts. In this case, the
"waitForTsFileClose()"
+ // may not return until a flush is executed, namely the data transfer
relies
+ // on a flush operation.
+ if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+ return;
+ }
assertTimeseriesCountOnReceiver(receiverEnv,
expectedTimeseriesCount.get(i));
}