This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-insert-null-values in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d662b455766312e571df8aa0cf8c9af27130abe0 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Mar 11 21:12:47 2024 +0800 Pipe IT: DO NOT MERGE --- .github/workflows/pipe-it-2cluster.yml | 2 -- .../pipe/it/autocreate/IoTDBPipeNullValueIT.java | 1 + .../runtime/PipeHandleLeaderChangeProcedure.java | 20 ++++++++++---------- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml index 604d2eca180..804194777ef 100644 --- a/.github/workflows/pipe-it-2cluster.yml +++ b/.github/workflows/pipe-it-2cluster.yml @@ -5,7 +5,6 @@ on: branches: - master - 'rel/1.*' - - pipe-meta-sync paths-ignore: - 'docs/**' - 'site/**' @@ -13,7 +12,6 @@ on: branches: - master - 'rel/1.*' - - pipe-meta-sync paths-ignore: - 'docs/**' - 'site/**' diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java index 008765713d0..b640809a928 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeNullValueIT.java @@ -216,6 +216,7 @@ public class IoTDBPipeNullValueIT extends AbstractPipeDualAutoIT { connectorAttributes.put("connector", "iotdb-thrift-connector"); connectorAttributes.put("connector.ip", receiverIp); connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + connectorAttributes.put("connector.batch.enable", "false"); if (withParsing) { extractorAttributes.put("start-time", "1970-01-01T08:00:00.000+08:00"); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index b04ac43e846..35feee77164 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -48,7 +48,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur private static final Logger LOGGER = LoggerFactory.getLogger(PipeHandleLeaderChangeProcedure.class); - private Map<TConsensusGroupId, Pair<Integer, Integer>> dataRegionGroupToOldAndNewLeaderPairMap = + private Map<TConsensusGroupId, Pair<Integer, Integer>> regionGroupToOldAndNewLeaderPairMap = new HashMap<>(); public PipeHandleLeaderChangeProcedure() { @@ -56,9 +56,9 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur } public PipeHandleLeaderChangeProcedure( - Map<TConsensusGroupId, Pair<Integer, Integer>> dataRegionGroupToOldAndNewLeaderPairMap) { + Map<TConsensusGroupId, Pair<Integer, Integer>> regionGroupToOldAndNewLeaderPairMap) { super(); - this.dataRegionGroupToOldAndNewLeaderPairMap = dataRegionGroupToOldAndNewLeaderPairMap; + this.regionGroupToOldAndNewLeaderPairMap = regionGroupToOldAndNewLeaderPairMap; } @Override @@ -87,7 +87,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur final Map<TConsensusGroupId, Integer> newConsensusGroupIdToLeaderConsensusIdMap = new HashMap<>(); - dataRegionGroupToOldAndNewLeaderPairMap.forEach( + regionGroupToOldAndNewLeaderPairMap.forEach( (regionGroupId, oldNewLeaderPair) -> newConsensusGroupIdToLeaderConsensusIdMap.put( regionGroupId, oldNewLeaderPair.getRight())); @@ -146,9 +146,9 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur public void serialize(DataOutputStream stream) throws IOException { stream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode()); super.serialize(stream); - ReadWriteIOUtils.write(dataRegionGroupToOldAndNewLeaderPairMap.size(), stream); + ReadWriteIOUtils.write(regionGroupToOldAndNewLeaderPairMap.size(), stream); for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry : - dataRegionGroupToOldAndNewLeaderPairMap.entrySet()) { + regionGroupToOldAndNewLeaderPairMap.entrySet()) { ReadWriteIOUtils.write(entry.getKey().getId(), stream); ReadWriteIOUtils.write(entry.getValue().getLeft(), stream); ReadWriteIOUtils.write(entry.getValue().getRight(), stream); @@ -163,7 +163,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur final int dataRegionGroupId = ReadWriteIOUtils.readInt(byteBuffer); final int oldDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer); final int newDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer); - dataRegionGroupToOldAndNewLeaderPairMap.put( + regionGroupToOldAndNewLeaderPairMap.put( new TConsensusGroupId(TConsensusGroupType.DataRegion, dataRegionGroupId), new Pair<>(oldDataRegionLeaderId, newDataRegionLeaderId)); } @@ -181,13 +181,13 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur return getProcId() == that.getProcId() && getCurrentState().equals(that.getCurrentState()) && getCycles() == that.getCycles() - && this.dataRegionGroupToOldAndNewLeaderPairMap.equals( - that.dataRegionGroupToOldAndNewLeaderPairMap); + && this.regionGroupToOldAndNewLeaderPairMap.equals( + that.regionGroupToOldAndNewLeaderPairMap); } @Override public int hashCode() { return Objects.hash( - getProcId(), getCurrentState(), getCycles(), dataRegionGroupToOldAndNewLeaderPairMap); + getProcId(), getCurrentState(), getCycles(), regionGroupToOldAndNewLeaderPairMap); } }
