This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a52ca3822d Pipe IT: Add tests for iotdb-legacy-pipe-sink & Add 
forced-log & file mode tests for real-time mode & Refactor ITs (#11617)
1a52ca3822d is described below

commit 1a52ca3822d852f13042864454d262a243ff68b4
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 27 17:45:54 2023 +0800

    Pipe IT: Add tests for iotdb-legacy-pipe-sink & Add forced-log & file mode 
tests for real-time mode & Refactor ITs (#11617)
---
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |  49 +++
 .../apache/iotdb/pipe/it/IoTDBPipeClusterIT.java   | 189 ++++++++----
 .../pipe/it/IoTDBPipeConnectorParallelIT.java      |  49 +--
 ...ipeDataSyncIT.java => IoTDBPipeDataSinkIT.java} | 132 ++++----
 .../it/{extractor => }/IoTDBPipeExtractorIT.java   | 343 ++++++---------------
 .../apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java |  33 +-
 .../apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java  |  34 +-
 7 files changed, 397 insertions(+), 432 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 8c0965b951a..f7aac47b65f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -411,6 +411,31 @@ public class TestUtils {
     return false;
   }
 
+  // This method will not throw failure given that a failure is encountered.
+  // Instead, it return a flag to indicate the result of the execution.
+  public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env, 
List<String> sqlList) {
+    for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+      try (Connection connection = env.getConnection();
+          Statement statement = connection.createStatement()) {
+        for (String sql : sqlList) {
+          statement.execute(sql);
+        }
+        return true;
+      } catch (SQLException e) {
+        if (retryCountLeft > 0) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException ignored) {
+          }
+        } else {
+          e.printStackTrace();
+          return false;
+        }
+      }
+    }
+    return false;
+  }
+
   public static void executeNonQueryOnSpecifiedDataNodeWithRetry(
       BaseEnv env, DataNodeWrapper wrapper, String sql) {
     for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
@@ -456,6 +481,30 @@ public class TestUtils {
     return false;
   }
 
+  public static boolean tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
+      BaseEnv env, DataNodeWrapper wrapper, List<String> sqlList) {
+    for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
+      try (Connection connection = 
env.getConnectionWithSpecifiedDataNode(wrapper);
+          Statement statement = connection.createStatement()) {
+        for (String sql : sqlList) {
+          statement.execute(sql);
+        }
+        return true;
+      } catch (SQLException e) {
+        if (retryCountLeft > 0) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException ignored) {
+          }
+        } else {
+          e.printStackTrace();
+          return false;
+        }
+      }
+    }
+    return false;
+  }
+
   public static void executeQuery(String sql) {
     executeQuery(sql, "root", "root");
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
index 632e74e19eb..8ead12976c2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeClusterIT.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.pipe.it;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
@@ -46,7 +47,9 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -118,18 +121,14 @@ public class IoTDBPipeClusterIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values 
(2010-01-01T10:00:00+08:00, 1)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values 
(2010-01-02T10:00:00+08:00, 2)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values 
(2010-01-01T10:00:00+08:00, 1)",
+              "insert into root.db.d1(time, s1) values 
(2010-01-02T10:00:00+08:00, 2)",
+              "flush"))) {
         return;
       }
-
       Map<String, String> extractorAttributes = new HashMap<>();
       Map<String, String> processorAttributes = new HashMap<>();
       Map<String, String> connectorAttributes = new HashMap<>();
@@ -167,11 +166,9 @@ public class IoTDBPipeClusterIT {
           "count(root.db.d1.s1),",
           Collections.singleton("1,"));
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (now(), 3)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.db.d1(time, s1) values (now(), 3)", 
"flush"))) {
         return;
       }
 
@@ -213,11 +210,8 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values 
(1, 1)", "flush"))) {
         return;
       }
 
@@ -236,27 +230,33 @@ public class IoTDBPipeClusterIT {
       for (int i = 0; i < 3; ++i) {
         if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
           leaderIndex = i;
-          senderEnv.shutdownDataNode(i);
+          try {
+            senderEnv.shutdownDataNode(i);
+          } catch (Exception e) {
+            e.printStackTrace();
+            return;
+          }
           try {
             TimeUnit.SECONDS.sleep(1);
           } catch (InterruptedException ignored) {
           }
-          senderEnv.startDataNode(i);
-          ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+          try {
+            senderEnv.startDataNode(i);
+            ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+          } catch (Exception e) {
+            e.printStackTrace();
+            return;
+          }
         }
       }
       if (leaderIndex == -1) { // ensure the leader is stopped
         fail();
       }
 
-      if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
+      if (!TestUtils.tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
           senderEnv,
           senderEnv.getDataNodeWrapper(leaderIndex),
-          "insert into root.db.d1(time, s1) values (2, 2)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
-          senderEnv, senderEnv.getDataNodeWrapper(leaderIndex), "flush")) {
+          Arrays.asList("insert into root.db.d1(time, s1) values (2, 2)", 
"flush"))) {
         return;
       }
 
@@ -267,8 +267,13 @@ public class IoTDBPipeClusterIT {
           Collections.singleton("2,"));
     }
 
-    TestUtils.restartCluster(senderEnv);
-    TestUtils.restartCluster(receiverEnv);
+    try {
+      TestUtils.restartCluster(senderEnv);
+      TestUtils.restartCluster(receiverEnv);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
@@ -294,11 +299,8 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p2").getCode());
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv, Arrays.asList("insert into root.db.d2(time, s1) values 
(1, 1)", "flush"))) {
         return;
       }
 
@@ -340,23 +342,23 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values 
(1, 1)", "flush"))) {
         return;
       }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+
+      try {
+        senderEnv.registerNewDataNode(true);
+      } catch (Exception e) {
+        e.printStackTrace();
         return;
       }
-
-      senderEnv.registerNewDataNode(true);
       DataNodeWrapper newDataNode =
           
senderEnv.getDataNodeWrapper(senderEnv.getDataNodeWrapperList().size() - 1);
-      if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
-          senderEnv, newDataNode, "insert into root.db.d1(time, s1) values (2, 
2)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryOnSpecifiedDataNodeWithRetry(
-          senderEnv, newDataNode, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
+          senderEnv,
+          newDataNode,
+          Arrays.asList("insert into root.db.d1(time, s1) values (2, 2)", 
"flush"))) {
         return;
       }
       TestUtils.assertDataOnEnv(
@@ -366,8 +368,13 @@ public class IoTDBPipeClusterIT {
           Collections.singleton("2,"));
     }
 
-    TestUtils.restartCluster(senderEnv);
-    TestUtils.restartCluster(receiverEnv);
+    try {
+      TestUtils.restartCluster(senderEnv);
+      TestUtils.restartCluster(receiverEnv);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
@@ -393,11 +400,8 @@ public class IoTDBPipeClusterIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p2").getCode());
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d2(time, s1) values (1, 1)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv, Arrays.asList("insert into root.db.d2(time, s1) values 
(1, 1)", "flush"))) {
         return;
       }
 
@@ -438,8 +442,9 @@ public class IoTDBPipeClusterIT {
                             .setExtractorAttributes(extractorAttributes)
                             .setProcessorAttributes(processorAttributes));
                   } catch (TException e) {
+                    // Not sure if the "createPipe" has succeeded
                     e.printStackTrace();
-                    fail(e.getMessage());
+                    return;
                   }
                   try {
                     Thread.sleep(100);
@@ -448,7 +453,12 @@ public class IoTDBPipeClusterIT {
                 }
               });
       t.start();
-      senderEnv.registerNewDataNode(true);
+      try {
+        senderEnv.registerNewDataNode(true);
+      } catch (Exception e) {
+        e.printStackTrace();
+        return;
+      }
       t.join();
     }
 
@@ -504,7 +514,12 @@ public class IoTDBPipeClusterIT {
                 }
               });
       t.start();
-      senderEnv.registerNewDataNode(true);
+      try {
+        senderEnv.registerNewDataNode(true);
+      } catch (Exception e) {
+        e.printStackTrace();
+        return;
+      }
       t.join();
 
       TestUtils.assertDataOnEnv(
@@ -513,8 +528,12 @@ public class IoTDBPipeClusterIT {
           "count(root.db.d1.s1),",
           Collections.singleton(succeedNum.get() + ","));
 
-      senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 
1);
-      
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
 - 1);
+      try {
+        senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 
1);
+        
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
 - 1);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
     }
   }
 
@@ -554,7 +573,12 @@ public class IoTDBPipeClusterIT {
         }
       }
 
-      senderEnv.registerNewDataNode(true);
+      try {
+        senderEnv.registerNewDataNode(true);
+      } catch (Exception e) {
+        e.printStackTrace();
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
@@ -562,8 +586,12 @@ public class IoTDBPipeClusterIT {
           "count(root.db.d1.s1),",
           Collections.singleton(succeedNum + ","));
 
-      senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 
1);
-      
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
 - 1);
+      try {
+        senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 
1);
+        
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
 - 1);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
     }
   }
 
@@ -609,11 +637,16 @@ public class IoTDBPipeClusterIT {
         }
       }
 
-      senderEnv.registerNewDataNode(false);
-      senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
-      senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 
1);
-      
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
 - 1);
-      ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+      try {
+        senderEnv.registerNewDataNode(false);
+        senderEnv.startDataNode(senderEnv.getDataNodeWrapperList().size() - 1);
+        senderEnv.shutdownDataNode(senderEnv.getDataNodeWrapperList().size() - 
1);
+        
senderEnv.getDataNodeWrapperList().remove(senderEnv.getDataNodeWrapperList().size()
 - 1);
+        ((AbstractEnv) senderEnv).testWorkingNoUnknown();
+      } catch (Exception e) {
+        e.printStackTrace();
+        return;
+      }
 
       TestUtils.assertDataOnEnv(
           receiverEnv,
@@ -667,7 +700,12 @@ public class IoTDBPipeClusterIT {
       return;
     }
 
-    TestUtils.restartCluster(senderEnv);
+    try {
+      TestUtils.restartCluster(senderEnv);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
 
     TestUtils.assertDataOnEnv(
         receiverEnv,
@@ -708,7 +746,12 @@ public class IoTDBPipeClusterIT {
                   if (status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                     successCount.updateAndGet(v -> v + 1);
                   }
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                } catch (TException | ClientManagerException | IOException e) {
+                  e.printStackTrace();
                 } catch (Exception e) {
+                  // Fail iff pipe exception occurs
                   e.printStackTrace();
                   fail(e.getMessage());
                 }
@@ -733,7 +776,12 @@ public class IoTDBPipeClusterIT {
                   if (status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                     successCount.updateAndGet(v -> v + 1);
                   }
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                } catch (TException | ClientManagerException | IOException e) {
+                  e.printStackTrace();
                 } catch (Exception e) {
+                  // Fail iff pipe exception occurs
                   e.printStackTrace();
                   fail(e.getMessage());
                 }
@@ -798,7 +846,12 @@ public class IoTDBPipeClusterIT {
                               .setProcessorAttributes(processorAttributes));
                   Assert.assertEquals(
                       TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                } catch (TException | ClientManagerException | IOException e) {
+                  e.printStackTrace();
                 } catch (Exception e) {
+                  // Fail iff pipe exception occurs
                   e.printStackTrace();
                   fail(e.getMessage());
                 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
index c8244cf0150..271957d61f9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeConnectorParallelIT.java
@@ -37,17 +37,11 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
@@ -105,14 +99,13 @@ public class IoTDBPipeConnectorParallelIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
 
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.sg1.d1(time, s1) values (0, 1)");
-        statement.execute("insert into root.sg1.d1(time, s1) values (1, 2)");
-        statement.execute("insert into root.sg1.d1(time, s1) values (2, 3)");
-        statement.execute("insert into root.sg1.d1(time, s1) values (3, 4)");
-      } catch (SQLException e) {
-        e.printStackTrace();
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.sg1.d1(time, s1) values (0, 1)",
+              "insert into root.sg1.d1(time, s1) values (1, 2)",
+              "insert into root.sg1.d1(time, s1) values (2, 3)",
+              "insert into root.sg1.d1(time, s1) values (3, 4)"))) {
         return;
       }
 
@@ -120,30 +113,8 @@ public class IoTDBPipeConnectorParallelIT {
       expectedResSet.add("1,2.0,");
       expectedResSet.add("2,3.0,");
       expectedResSet.add("3,4.0,");
-      assertDataOnReceiver(receiverEnv, expectedResSet);
-      assertDataOnReceiver(receiverEnv, expectedResSet);
-    }
-  }
-
-  private void assertDataOnReceiver(BaseEnv receiverEnv, Set<String> 
expectedResSet) {
-    try (Connection connection = receiverEnv.getConnection();
-        Statement statement = connection.createStatement()) {
-      await()
-          .atMost(600, TimeUnit.SECONDS)
-          .untilAsserted(
-              () -> {
-                try {
-                  TestUtils.assertResultSetEqual(
-                      statement.executeQuery("select * from root.**"),
-                      "Time,root.sg1.d1.s1,",
-                      expectedResSet);
-                } catch (Exception e) {
-                  Assert.fail();
-                }
-              });
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
+      TestUtils.assertDataOnEnv(
+          receiverEnv, "select * from root.**", "Time,root.sg1.d1.s1,", 
expectedResSet);
     }
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
similarity index 63%
rename from 
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
index 3c235764975..52499cd14d4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSinkIT.java
@@ -37,20 +37,14 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2.class})
-public class IoTDBPipeDataSyncIT {
+public class IoTDBPipeDataSinkIT {
 
   private BaseEnv senderEnv;
   private BaseEnv receiverEnv;
@@ -75,7 +69,7 @@ public class IoTDBPipeDataSyncIT {
   }
 
   @Test
-  public void testEnv() throws Exception {
+  public void testThriftConnector() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     String receiverIp = receiverDataNode.getIp();
@@ -107,34 +101,65 @@ public class IoTDBPipeDataSyncIT {
 
       // Do not fail if the failure has nothing to do with pipe
       // Because the failures will randomly generate due to resource limitation
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("insert into root.vehicle.d0(time, s1) values (0, 
1)");
-      } catch (SQLException e) {
-        e.printStackTrace();
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.vehicle.d0(time, s1) values (0, 1)")) {
         return;
       }
 
-      try (Connection connection = receiverEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        await()
-            .atMost(600, TimeUnit.SECONDS)
-            .untilAsserted(
-                () -> {
-                  try {
-                    TestUtils.assertResultSetEqual(
-                        statement.executeQuery("select * from root.**"),
-                        "Time,root.vehicle.d0.s1,",
-                        Collections.singleton("0,1.0,"));
-                  } catch (Exception e) {
-                    // Handle the exception generated during "executeQuery"
-                    Assert.fail();
-                  }
-                });
-      } catch (Exception e) {
-        e.printStackTrace();
-        fail(e.getMessage());
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.vehicle.d0.s1,",
+          Collections.singleton("0,1.0,"));
+    }
+  }
+
+  @Test
+  public void testLegacyConnector() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("source.realtime.mode", "log");
+
+      connectorAttributes.put("sink", "iotdb-legacy-pipe-sink");
+      connectorAttributes.put("sink.batch.enable", "false");
+      connectorAttributes.put("sink.ip", receiverIp);
+      connectorAttributes.put("sink.port", Integer.toString(receiverPort));
+
+      // This version does not matter since it's no longer checked by the 
legacy receiver
+      connectorAttributes.put("sink.version", "1.3");
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("testPipe", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.vehicle.d0(time, s1) values (0, 1)")) {
+        return;
       }
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.vehicle.d0.s1,",
+          Collections.singleton("0,1.0,"));
     }
   }
 
@@ -166,37 +191,20 @@ public class IoTDBPipeDataSyncIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
 
-      try (Connection connection = receiverEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("create aligned timeseries root.sg.d1(s0 float, s1 
float)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
-
-      try (Connection connection = senderEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.execute("create aligned timeseries root.sg.d1(s0 float, s1 
float)");
-        statement.execute("insert into root.sg.d1(time, s0, s1) values (3, 
null, 25.34)");
-      } catch (SQLException e) {
-        e.printStackTrace();
-        fail(e.getMessage());
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          receiverEnv,
+          Arrays.asList(
+              "create aligned timeseries root.sg.d1(s0 float, s1 float)",
+              "create aligned timeseries root.sg.d1(s0 float, s1 float)",
+              "insert into root.sg.d1(time, s0, s1) values (3, null, 
25.34)"))) {
+        return;
       }
 
-      try (Connection connection = receiverEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        await()
-            .atMost(600, TimeUnit.SECONDS)
-            .untilAsserted(
-                () ->
-                    TestUtils.assertResultSetEqual(
-                        statement.executeQuery("select * from root.**"),
-                        "Time,root.sg.d1.s0,root.sg.d1.s1,",
-                        Collections.singleton("3,null,25.34,")));
-      } catch (Exception e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.sg.d1.s0,root.sg.d1.s1,",
+          Collections.singleton("3,null,25.34,"));
     }
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
similarity index 65%
rename from 
integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
index e7aa84fd681..a8f0828bfa9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/extractor/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeExtractorIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.pipe.it.extractor;
+package org.apache.iotdb.pipe.it;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -48,10 +48,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
-import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
@@ -211,53 +208,23 @@ public class IoTDBPipeExtractorIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.nonAligned.1TS (time, s_float) values 
(now(), 0.5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.nonAligned.100TS (time, s_float) values 
(now(), 0.5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.nonAligned.1000TS (time, s_float) 
values (now(), 0.5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.nonAligned.`1(TS)` (time, s_float) 
values (now(), 0.5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
-          "insert into root.nonAligned.6TS.`6` ("
-              + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, 
`s_text(1)`, `s_bool(1)`) "
-              + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.aligned.1TS (time, s_float) aligned 
values (now(), 0.5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv,
-          "insert into root.aligned.100TS (time, s_float) aligned values 
(now(), 0.5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv,
-          "insert into root.aligned.1000TS (time, s_float) aligned values 
(now(), 0.5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv,
-          "insert into root.aligned.`1(TS)` (time, s_float) aligned values 
(now(), 0.5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv,
-          "insert into root.aligned.6TS.`6` ("
-              + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, `s_long(1)`, 
`s_text(1)`, `s_bool(1)`) "
-              + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", true)")) {
+          Arrays.asList(
+              "insert into root.nonAligned.1TS (time, s_float) values (now(), 
0.5)",
+              "insert into root.nonAligned.100TS (time, s_float) values 
(now(), 0.5)",
+              "insert into root.nonAligned.1000TS (time, s_float) values 
(now(), 0.5)",
+              "insert into root.nonAligned.`1(TS)` (time, s_float) values 
(now(), 0.5)",
+              "insert into root.nonAligned.6TS.`6` ("
+                  + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, 
`s_long(1)`, `s_text(1)`, `s_bool(1)`) "
+                  + "values (now(), 0.5, 1, 1.5, 2, \"text1\", true)",
+              "insert into root.aligned.1TS (time, s_float) aligned values 
(now(), 0.5)",
+              "insert into root.aligned.100TS (time, s_float) aligned values 
(now(), 0.5)",
+              "insert into root.aligned.1000TS (time, s_float) aligned values 
(now(), 0.5)",
+              "insert into root.aligned.`1(TS)` (time, s_float) aligned values 
(now(), 0.5)",
+              "insert into root.aligned.6TS.`6` ("
+                  + "time, `s_float(1)`, `s_int(1)`, `s_double(1)`, 
`s_long(1)`, `s_text(1)`, `s_bool(1)`) "
+                  + "aligned values (now(), 0.5, 1, 1.5, 2, \"text1\", 
true)"))) {
         return;
       }
 
@@ -304,36 +271,22 @@ public class IoTDBPipeExtractorIT {
         assertTimeseriesCountOnReceiver(receiverEnv, 
expectedTimeseriesCount.get(i));
       }
 
-      try (Connection connection = receiverEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        Set<String> expectedDevices = new HashSet<>();
-        expectedDevices.add("root.nonAligned.1TS,false,");
-        expectedDevices.add("root.nonAligned.100TS,false,");
-        expectedDevices.add("root.nonAligned.1000TS,false,");
-        expectedDevices.add("root.nonAligned.`1(TS)`,false,");
-        expectedDevices.add("root.nonAligned.6TS.`6`,false,");
-        expectedDevices.add("root.aligned.1TS,true,");
-        expectedDevices.add("root.aligned.100TS,true,");
-        expectedDevices.add("root.aligned.1000TS,true,");
-        expectedDevices.add("root.aligned.`1(TS)`,true,");
-        expectedDevices.add("root.aligned.6TS.`6`,true,");
-        await()
-            .atMost(600, TimeUnit.SECONDS)
-            .untilAsserted(
-                () -> {
-                  try {
-                    TestUtils.assertResultSetEqual(
-                        statement.executeQuery("show devices"),
-                        "Device,IsAligned,",
-                        expectedDevices);
-                  } catch (Exception e) {
-                    Assert.fail();
-                  }
-                });
-      } catch (Exception e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "show devices",
+          "Device,IsAligned,",
+          new HashSet<>(
+              Arrays.asList(
+                  "root.nonAligned.1TS,false,",
+                  "root.nonAligned.100TS,false,",
+                  "root.nonAligned.1000TS,false,",
+                  "root.nonAligned.`1(TS)`,false,",
+                  "root.nonAligned.6TS.`6`,false,",
+                  "root.aligned.1TS,true,",
+                  "root.aligned.100TS,true,",
+                  "root.aligned.1000TS,true,",
+                  "root.aligned.`1(TS)`,true,",
+                  "root.aligned.6TS.`6`,true,")));
     }
   }
 
@@ -367,15 +320,12 @@ public class IoTDBPipeExtractorIT {
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
       assertTimeseriesCountOnReceiver(receiverEnv, 0);
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db1.d1 (time, at1) values (1, 10)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db2.d1 (time, at1) values (1, 20)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db1.d1 (time, at1) values (1, 10)",
+              "insert into root.db2.d1 (time, at1) values (1, 20)",
+              "flush"))) {
         return;
       }
 
@@ -395,15 +345,12 @@ public class IoTDBPipeExtractorIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("p2").getCode());
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db1.d1 (time, at1) values (2, 11)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db2.d1 (time, at1) values (2, 21)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db1.d1 (time, at1) values (2, 11)",
+              "insert into root.db2.d1 (time, at1) values (2, 21)",
+              "flush"))) {
         return;
       }
 
@@ -417,25 +364,11 @@ public class IoTDBPipeExtractorIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p3").getCode());
 
-      try (Connection connection = receiverEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        await()
-            .atMost(600, TimeUnit.SECONDS)
-            .untilAsserted(
-                () -> {
-                  try {
-                    TestUtils.assertResultSetEqual(
-                        statement.executeQuery("select count(*) from root.**"),
-                        "count(root.db1.d1.at1),count(root.db2.d1.at1),",
-                        Collections.singleton("2,2,"));
-                  } catch (Exception e) {
-                    Assert.fail();
-                  }
-                });
-      } catch (Exception e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db1.d1.at1),count(root.db2.d1.at1),",
+          Collections.singleton("2,2,"));
     }
   }
 
@@ -448,23 +381,14 @@ public class IoTDBPipeExtractorIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1 (time, at1) values (1, 10)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d2 (time, at1) values (1, 20)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d3 (time, at1) values (1, 30)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d4 (time, at1) values (1, 40)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1 (time, at1) values (1, 10)",
+              "insert into root.db.d2 (time, at1) values (1, 20)",
+              "insert into root.db.d3 (time, at1) values (1, 30)",
+              "insert into root.db.d4 (time, at1) values (1, 40)",
+              "flush"))) {
         return;
       }
 
@@ -524,55 +448,26 @@ public class IoTDBPipeExtractorIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p4").getCode());
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1 (time, at1) values (2, 11)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d2 (time, at1) values (2, 21)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d3 (time, at1) values (2, 31)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d4 (time, at1) values (2, 41)")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1 (time, at1) values (2, 11)",
+              "insert into root.db.d2 (time, at1) values (2, 21)",
+              "insert into root.db.d3 (time, at1) values (2, 31)",
+              "insert into root.db.d4 (time, at1) values (2, 41)"))) {
         return;
       }
 
-      try (Connection connection = receiverEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        await()
-            .atMost(600, TimeUnit.SECONDS)
-            .untilAsserted(
-                () -> {
-                  try {
-                    TestUtils.assertResultSetEqual(
-                        statement.executeQuery("select count(*) from root.** 
where time <= 1"),
-                        
"count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
-                        Collections.singleton("1,0,1,"));
-                  } catch (Exception e) {
-                    Assert.fail();
-                  }
-                });
-        await()
-            .atMost(600, TimeUnit.SECONDS)
-            .untilAsserted(
-                () -> {
-                  try {
-                    TestUtils.assertResultSetEqual(
-                        statement.executeQuery("select count(*) from root.** 
where time >= 2"),
-                        
"count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
-                        Collections.singleton("1,1,0,"));
-                  } catch (Exception e) {
-                    Assert.fail();
-                  }
-                });
-      } catch (Exception e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.** where time <= 1",
+          "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
+          Collections.singleton("1,0,1,"));
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.** where time >= 2",
+          "count(root.db.d4.at1),count(root.db.d2.at1),count(root.db.d3.at1),",
+          Collections.singleton("1,1,0,"));
     }
   }
 
@@ -585,19 +480,14 @@ public class IoTDBPipeExtractorIT {
 
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv,
-          "insert into root.db.d1 (time, at1)"
-              + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 
5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
-          "insert into root.db.d2 (time, at1)"
-              + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 
5)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+          Arrays.asList(
+              "insert into root.db.d1 (time, at1)"
+                  + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), 
(5000, 5)",
+              "insert into root.db.d2 (time, at1)"
+                  + " values (1000, 1), (2000, 2), (3000, 3), (4000, 4), 
(5000, 5)",
+              "flush"))) {
         return;
       }
 
@@ -624,25 +514,11 @@ public class IoTDBPipeExtractorIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      try (Connection connection = receiverEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        await()
-            .atMost(600, TimeUnit.SECONDS)
-            .untilAsserted(
-                () -> {
-                  try {
-                    TestUtils.assertResultSetEqual(
-                        statement.executeQuery("select count(*) from root.**"),
-                        "count(root.db.d1.at1),",
-                        Collections.singleton("3,"));
-                  } catch (Exception e) {
-                    Assert.fail();
-                  }
-                });
-      } catch (Exception e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.at1),",
+          Collections.singleton("3,"));
 
       extractorAttributes.remove("extractor.pattern");
       status =
@@ -654,47 +530,16 @@ public class IoTDBPipeExtractorIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p2").getCode());
 
-      try (Connection connection = receiverEnv.getConnection();
-          Statement statement = connection.createStatement()) {
-        await()
-            .atMost(600, TimeUnit.SECONDS)
-            .untilAsserted(
-                () -> {
-                  try {
-                    TestUtils.assertResultSetEqual(
-                        statement.executeQuery("select count(*) from root.**"),
-                        "count(root.db.d1.at1),count(root.db.d2.at1),",
-                        Collections.singleton("3,3,"));
-                  } catch (Exception e) {
-                    Assert.fail();
-                  }
-                });
-      } catch (Exception e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.at1),count(root.db.d2.at1),",
+          Collections.singleton("3,3,"));
     }
   }
 
   private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count) 
{
-    try (Connection connection = receiverEnv.getConnection();
-        Statement statement = connection.createStatement()) {
-      await()
-          .atMost(600, TimeUnit.SECONDS)
-          .untilAsserted(
-              () -> {
-                try {
-                  TestUtils.assertResultSetEqual(
-                      statement.executeQuery("count timeseries"),
-                      "count(timeseries),",
-                      Collections.singleton(count + ","));
-                } catch (Exception e) {
-                  Assert.fail();
-                }
-              });
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
+    TestUtils.assertDataOnEnv(
+        receiverEnv, "count timeseries", "count(timeseries),", 
Collections.singleton(count + ","));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
index 166e0a5fd2c..2cd8acc60c2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeLifeCycleIT.java
@@ -37,6 +37,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -149,11 +150,8 @@ public class IoTDBPipeLifeCycleIT {
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
-        return;
-      }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values 
(1, 1)", "flush"))) {
         return;
       }
 
@@ -447,8 +445,13 @@ public class IoTDBPipeLifeCycleIT {
           receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
     }
 
-    TestUtils.restartCluster(senderEnv);
-    TestUtils.restartCluster(receiverEnv);
+    try {
+      TestUtils.restartCluster(senderEnv);
+      TestUtils.restartCluster(receiverEnv);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
 
     try (SyncConfigNodeIServiceClient ignored =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
@@ -511,7 +514,12 @@ public class IoTDBPipeLifeCycleIT {
               });
       t.start();
 
-      TestUtils.restartCluster(receiverEnv);
+      try {
+        TestUtils.restartCluster(receiverEnv);
+      } catch (Exception e) {
+        e.printStackTrace();
+        return;
+      }
       t.join();
 
       TestUtils.assertDataOnEnv(
@@ -680,8 +688,13 @@ public class IoTDBPipeLifeCycleIT {
     TestUtils.assertDataOnEnv(
         receiverEnv, "select * from root.**", "Time,root.db.d1.s1,", 
expectedResSet);
 
-    TestUtils.restartCluster(senderEnv);
-    TestUtils.restartCluster(receiverEnv);
+    try {
+      TestUtils.restartCluster(senderEnv);
+      TestUtils.restartCluster(receiverEnv);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
 
     for (int i = 400; i < 500; ++i) {
       if (!TestUtils.tryExecuteNonQueryWithRetry(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
index 1290e46b94a..6bcaee39b56 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeProtocolIT.java
@@ -39,6 +39,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -402,6 +403,9 @@ public class IoTDBPipeProtocolIT {
       connectorAttributes.put("connector.batch.enable", "false");
       connectorAttributes.put("connector.node-urls", 
nodeUrlsBuilder.toString());
 
+      // Test forced-log mode, in TimechoDB this might be "file"
+      extractorAttributes.put("source.realtime.mode", "forced-log");
+
       TSStatus status =
           client.createPipe(
               new TCreatePipeReq("p1", connectorAttributes)
@@ -413,11 +417,33 @@ public class IoTDBPipeProtocolIT {
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
 
-      if (!TestUtils.tryExecuteNonQueryWithRetry(
-          senderEnv, "insert into root.db.d1(time, s1) values (2, 2)")) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values 
(2, 2)", "flush"))) {
         return;
       }
-      if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
+
+      TestUtils.assertDataOnEnv(
+          receiverEnv,
+          "select count(*) from root.**",
+          "count(root.db.d1.s1),",
+          Collections.singleton("2,"));
+
+      // Test file mode
+      extractorAttributes.replace("source.realtime.mode", "file");
+
+      status =
+          client.createPipe(
+              new TCreatePipeReq("p2", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      System.out.println(status.getMessage());
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p2").getCode());
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv, Arrays.asList("insert into root.db.d1(time, s1) values 
(3, 3)", "flush"))) {
         return;
       }
 
@@ -425,7 +451,7 @@ public class IoTDBPipeProtocolIT {
           receiverEnv,
           "select count(*) from root.**",
           "count(root.db.d1.s1),",
-          Collections.singleton("2,"));
+          Collections.singleton("3,"));
     }
   }
 }

Reply via email to