This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch cp-fix-data-lost in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 460bed0fe2de09eb6b56fe825a210c8af39b74c8 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Sep 18 15:06:54 2024 +0800 Pipe: Fix data lost when syncing between clusters cause by senders' data region leader change (#13532) (cherry picked from commit 6b71f769693eb8f61ac71f5a0088242fbf55665e) --- .../payload/evolvable/batch/PipeTabletEventBatch.java | 4 ++-- .../payload/evolvable/batch/PipeTransferBatchReqBuilder.java | 6 +++--- .../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java | 7 ++++--- .../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java | 4 ++-- .../db/pipe/task/subtask/connector/PipeConnectorSubtask.java | 12 +++++++----- .../subtask/connector/PipeConnectorSubtaskLifeCycle.java | 5 +++-- .../task/subtask/connector/PipeConnectorSubtaskManager.java | 6 +++--- .../subtask/connector/PipeRealtimePriorityBlockingQueue.java | 7 ++++--- .../task/subtask/SubscriptionConnectorSubtaskLifeCycle.java | 2 +- .../task/subtask/SubscriptionConnectorSubtaskManager.java | 6 +++--- .../commons/pipe/connector/protocol/IoTDBConnector.java | 2 +- .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 5 +++++ .../commons/pipe/task/connection/BlockingPendingQueue.java | 5 +++-- 13 files changed, 41 insertions(+), 30 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java index 75d4b2f9e3c..c7b8e1f0615 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java @@ -123,10 +123,10 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { * Discard all events of the given pipe. This method only clears the reference count of the events * and discard them, but do not modify other objects (such as buffers) for simplicity. */ - public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { + public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { events.removeIf( event -> { - if (pipeNameToDrop.equals(event.getPipeName())) { + if (pipeNameToDrop.equals(event.getPipeName()) && regionId == event.getRegionId()) { event.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName()); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index 77fd73f8aba..9b787ee30f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -184,9 +184,9 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { && endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty); } - public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { - defaultBatch.discardEventsOfPipe(pipeNameToDrop); - endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop)); + public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId); + endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, regionId)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index f68d86dd7e8..b266d8cca3b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -511,14 +511,15 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { //////////////////////////// Operations for close //////////////////////////// @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { + public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { if (isTabletBatchModeEnabled) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop); + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { + && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) + && regionId == ((EnrichedEvent) event).getRegionId()) { ((EnrichedEvent) event) .clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName()); return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index 772a55b0e17..f67a2b88dd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -493,8 +493,8 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { } @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop); + public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index d55f18f9838..7c88c962466 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -218,9 +218,9 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe(final String pipeNameToDrop) { + public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { // Try to remove the events as much as possible - inputPendingQueue.discardEventsOfPipe(pipeNameToDrop); + inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId); // synchronized to use the lastEvent & lastExceptionEvent synchronized (this) { @@ -230,7 +230,8 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { // Note that since we use a new thread to stop all the pipes, we will not encounter deadlock // here. Or else we will. if (lastEvent instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())) { + && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()) + && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { // Do not clear last event's reference count because it may be on transferring lastEvent = null; // Submit self to avoid that the lastEvent has been retried "max times" times and has @@ -252,13 +253,14 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { // clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the // "nonnull" detection. if (lastExceptionEvent instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName())) { + && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName()) + && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { clearReferenceCountAndReleaseLastExceptionEvent(); } } if (outputPipeConnector instanceof IoTDBConnector) { - ((IoTDBConnector) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop); + ((IoTDBConnector) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java index e3d4d35a171..26c18b3dad6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java @@ -85,16 +85,17 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable { * connector scheduling. * * @param pipeNameToDeregister pipe name + * @param regionId region id * @return {@code true} if the {@link PipeConnectorSubtask} is out of life cycle, indicating that * the {@link PipeConnectorSubtask} should never be used again * @throws IllegalStateException if {@link PipeConnectorSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister(final String pipeNameToDeregister) { + public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } - subtask.discardEventsOfPipe(pipeNameToDeregister); + subtask.discardEventsOfPipe(pipeNameToDeregister, regionId); try { if (registeredTaskCount > 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java index f8809b6b689..5bb14073648 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java @@ -179,7 +179,7 @@ public class PipeConnectorSubtaskManager { public synchronized void deregister( final String pipeName, final long creationTime, - final int dataRegionId, + final int regionId, final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); @@ -187,13 +187,13 @@ public class PipeConnectorSubtaskManager { final List<PipeConnectorSubtaskLifeCycle> lifeCycles = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - lifeCycles.removeIf(o -> o.deregister(pipeName)); + lifeCycles.removeIf(o -> o.deregister(pipeName, regionId)); if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } - PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, dataRegionId); + PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, regionId); } public synchronized void start(final String attributeSortedString) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java index 6b33c248bfb..beabba3128a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java @@ -168,12 +168,13 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ } @Override - public void discardEventsOfPipe(final String pipeNameToDrop) { - super.discardEventsOfPipe(pipeNameToDrop); + public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, regionId); tsfileInsertEventDeque.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { + && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) + && regionId == ((EnrichedEvent) event).getRegionId()) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java index c05bd13f07c..7e7f5fa1d21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java @@ -69,7 +69,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends PipeConnectorSubtaskL } @Override - public synchronized boolean deregister(final String ignored) { + public synchronized boolean deregister(final String ignored, int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java index 45d62174e4b..377790836fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java @@ -164,7 +164,7 @@ public class SubscriptionConnectorSubtaskManager { public synchronized void deregister( final String pipeName, final long creationTime, - final int dataRegionId, + final int regionId, final String attributeSortedString) { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString); @@ -172,11 +172,11 @@ public class SubscriptionConnectorSubtaskManager { final PipeConnectorSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - if (lifeCycle.deregister(pipeName)) { + if (lifeCycle.deregister(pipeName, regionId)) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } - PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, dataRegionId); + PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, regionId); } public synchronized void start(final String attributeSortedString) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index eb66a638672..e1f8f9ed1d5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -476,7 +476,7 @@ public abstract class IoTDBConnector implements PipeConnector { * When a pipe is dropped, the connector maybe reused and will not be closed. We need to discard * its batched or queued events in the output pipe connector. */ - public synchronized void discardEventsOfPipe(final String pipeName) { + public synchronized void discardEventsOfPipe(final String pipeName, final int regionId) { // Do nothing by default } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index 4429d4d8b92..7c7d6fe9ea2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -292,6 +292,11 @@ public abstract class EnrichedEvent implements Event { return creationTime; } + public final int getRegionId() { + // TODO: persist regionId in EnrichedEvent + return committerKey == null ? -1 : committerKey.getRegionId(); + } + public final boolean isDataRegionEvent() { return !(this instanceof PipeWritePlanEvent) && !(this instanceof PipeSnapshotEvent); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java index c67e80181f9..e981818b87e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java @@ -124,11 +124,12 @@ public abstract class BlockingPendingQueue<E extends Event> { eventCounter.reset(); } - public void discardEventsOfPipe(final String pipeNameToDrop) { + public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) { + && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) + && regionId == ((EnrichedEvent) event).getRegionId()) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); }
