This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-insert-null-values
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d662b455766312e571df8aa0cf8c9af27130abe0
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Mar 11 21:12:47 2024 +0800

    Pipe IT: DO NOT MERGE
---
 .github/workflows/pipe-it-2cluster.yml               |  2 --
 .../pipe/it/autocreate/IoTDBPipeNullValueIT.java     |  1 +
 .../runtime/PipeHandleLeaderChangeProcedure.java     | 20 ++++++++++----------
 3 files changed, 11 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/pipe-it-2cluster.yml 
b/.github/workflows/pipe-it-2cluster.yml
index 604d2eca180..804194777ef 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -5,7 +5,6 @@ on:
     branches:
       - master
       - 'rel/1.*'
-      - pipe-meta-sync
     paths-ignore:
       - 'docs/**'
       - 'site/**'
@@ -13,7 +12,6 @@ on:
     branches:
       - master
       - 'rel/1.*'
-      - pipe-meta-sync
     paths-ignore:
       - 'docs/**'
       - 'site/**'
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
index 008765713d0..b640809a928 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java
@@ -216,6 +216,7 @@ public class IoTDBPipeNullValueIT extends 
AbstractPipeDualAutoIT {
       connectorAttributes.put("connector", "iotdb-thrift-connector");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.batch.enable", "false");
 
       if (withParsing) {
         extractorAttributes.put("start-time", "1970-01-01T08:00:00.000+08:00");
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index b04ac43e846..35feee77164 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -48,7 +48,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeHandleLeaderChangeProcedure.class);
 
-  private Map<TConsensusGroupId, Pair<Integer, Integer>> 
dataRegionGroupToOldAndNewLeaderPairMap =
+  private Map<TConsensusGroupId, Pair<Integer, Integer>> 
regionGroupToOldAndNewLeaderPairMap =
       new HashMap<>();
 
   public PipeHandleLeaderChangeProcedure() {
@@ -56,9 +56,9 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
   }
 
   public PipeHandleLeaderChangeProcedure(
-      Map<TConsensusGroupId, Pair<Integer, Integer>> 
dataRegionGroupToOldAndNewLeaderPairMap) {
+      Map<TConsensusGroupId, Pair<Integer, Integer>> 
regionGroupToOldAndNewLeaderPairMap) {
     super();
-    this.dataRegionGroupToOldAndNewLeaderPairMap = 
dataRegionGroupToOldAndNewLeaderPairMap;
+    this.regionGroupToOldAndNewLeaderPairMap = 
regionGroupToOldAndNewLeaderPairMap;
   }
 
   @Override
@@ -87,7 +87,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
 
     final Map<TConsensusGroupId, Integer> 
newConsensusGroupIdToLeaderConsensusIdMap =
         new HashMap<>();
-    dataRegionGroupToOldAndNewLeaderPairMap.forEach(
+    regionGroupToOldAndNewLeaderPairMap.forEach(
         (regionGroupId, oldNewLeaderPair) ->
             newConsensusGroupIdToLeaderConsensusIdMap.put(
                 regionGroupId, oldNewLeaderPair.getRight()));
@@ -146,9 +146,9 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
   public void serialize(DataOutputStream stream) throws IOException {
     
stream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
     super.serialize(stream);
-    ReadWriteIOUtils.write(dataRegionGroupToOldAndNewLeaderPairMap.size(), 
stream);
+    ReadWriteIOUtils.write(regionGroupToOldAndNewLeaderPairMap.size(), stream);
     for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry :
-        dataRegionGroupToOldAndNewLeaderPairMap.entrySet()) {
+        regionGroupToOldAndNewLeaderPairMap.entrySet()) {
       ReadWriteIOUtils.write(entry.getKey().getId(), stream);
       ReadWriteIOUtils.write(entry.getValue().getLeft(), stream);
       ReadWriteIOUtils.write(entry.getValue().getRight(), stream);
@@ -163,7 +163,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
       final int dataRegionGroupId = ReadWriteIOUtils.readInt(byteBuffer);
       final int oldDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
       final int newDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
-      dataRegionGroupToOldAndNewLeaderPairMap.put(
+      regionGroupToOldAndNewLeaderPairMap.put(
           new TConsensusGroupId(TConsensusGroupType.DataRegion, 
dataRegionGroupId),
           new Pair<>(oldDataRegionLeaderId, newDataRegionLeaderId));
     }
@@ -181,13 +181,13 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
     return getProcId() == that.getProcId()
         && getCurrentState().equals(that.getCurrentState())
         && getCycles() == that.getCycles()
-        && this.dataRegionGroupToOldAndNewLeaderPairMap.equals(
-            that.dataRegionGroupToOldAndNewLeaderPairMap);
+        && this.regionGroupToOldAndNewLeaderPairMap.equals(
+            that.regionGroupToOldAndNewLeaderPairMap);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(
-        getProcId(), getCurrentState(), getCycles(), 
dataRegionGroupToOldAndNewLeaderPairMap);
+        getProcId(), getCurrentState(), getCycles(), 
regionGroupToOldAndNewLeaderPairMap);
   }
 }

Reply via email to