This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 29fab685489 Pipe IT: Added flush on 
IoTDBPipeExtractorIT#testExtractorPatternMatch() to avoid pipe on previous 
follower getting unclosed historical tsfile from the old leader (#12534)
29fab685489 is described below

commit 29fab68548970d195a3cb18e0031ba115f216cb0
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 17 13:26:00 2024 +0800

    Pipe IT: Added flush on IoTDBPipeExtractorIT#testExtractorPatternMatch() to 
avoid pipe on previous follower getting unclosed historical tsfile from the old 
leader (#12534)
---
 .../pipe/it/autocreate/IoTDBPipeExtractorIT.java   | 36 +++++++++++++---------
 1 file changed, 22 insertions(+), 14 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index ba2e8021267..541d628aa7e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -78,7 +78,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p1_1);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -98,7 +98,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p1_2);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -118,7 +118,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p1_3);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -139,7 +139,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p1_4);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -160,7 +160,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p1_5);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -185,7 +185,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p2_1);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -204,7 +204,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p2_2);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -224,7 +224,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p2_3);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -244,7 +244,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p2_4);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -270,7 +270,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(p3_1);
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       fail(e.getMessage());
     }
 
@@ -305,7 +305,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
           final Statement statement = connection.createStatement()) {
         statement.execute(String.format(formatString, invalidStartTime));
         fail();
-      } catch (SQLException ignored) {
+      } catch (final SQLException ignored) {
       }
     }
     assertPipeCount(0);
@@ -328,7 +328,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
         final Statement statement = connection.createStatement()) {
       statement.execute(p2);
       fail();
-    } catch (SQLException ignored) {
+    } catch (final SQLException ignored) {
     }
     assertPipeCount(0);
 
@@ -351,7 +351,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
         final Statement statement = connection.createStatement()) {
       statement.execute(p3);
       fail();
-    } catch (SQLException ignored) {
+    } catch (final SQLException ignored) {
     }
     assertPipeCount(0);
 
@@ -372,7 +372,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
         final Statement statement = connection.createStatement()) {
       statement.execute(p4);
       fail();
-    } catch (SQLException ignored) {
+    } catch (final SQLException ignored) {
     }
     assertPipeCount(0);
   }
@@ -447,6 +447,14 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
         Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
         Assert.assertEquals(
             TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p" 
+ i).getCode());
+        // We add flush here because the pipe may be created on the new IoT 
leader
+        // and the old leader's data may come as an unclosed historical tsfile
+        // and is skipped flush when the pipe starts. In this case, the 
"waitForTsFileClose()"
+        // may not return until a flush is executed, namely the data transfer 
relies
+        // on a flush operation.
+        if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+          return;
+        }
         assertTimeseriesCountOnReceiver(receiverEnv, 
expectedTimeseriesCount.get(i));
       }
 

Reply via email to