This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b71f769693 Pipe: Fix data lost when syncing between clusters cause by 
senders' data region leader change (#13532)
6b71f769693 is described below

commit 6b71f769693eb8f61ac71f5a0088242fbf55665e
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Sep 18 15:06:54 2024 +0800

    Pipe: Fix data lost when syncing between clusters cause by senders' data 
region leader change (#13532)
---
 .../payload/evolvable/batch/PipeTabletEventBatch.java        |  4 ++--
 .../payload/evolvable/batch/PipeTransferBatchReqBuilder.java |  6 +++---
 .../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java |  7 ++++---
 .../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java   |  4 ++--
 .../db/pipe/task/subtask/connector/PipeConnectorSubtask.java | 12 +++++++-----
 .../subtask/connector/PipeConnectorSubtaskLifeCycle.java     |  5 +++--
 .../task/subtask/connector/PipeConnectorSubtaskManager.java  |  6 +++---
 .../subtask/connector/PipeRealtimePriorityBlockingQueue.java |  7 ++++---
 .../task/subtask/SubscriptionConnectorSubtaskLifeCycle.java  |  2 +-
 .../task/subtask/SubscriptionConnectorSubtaskManager.java    |  6 +++---
 .../commons/pipe/connector/protocol/IoTDBConnector.java      |  2 +-
 .../org/apache/iotdb/commons/pipe/event/EnrichedEvent.java   |  5 +++++
 .../commons/pipe/task/connection/BlockingPendingQueue.java   |  5 +++--
 13 files changed, 41 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 75d4b2f9e3c..c7b8e1f0615 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -123,10 +123,10 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
    * Discard all events of the given pipe. This method only clears the 
reference count of the events
    * and discard them, but do not modify other objects (such as buffers) for 
simplicity.
    */
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
     events.removeIf(
         event -> {
-          if (pipeNameToDrop.equals(event.getPipeName())) {
+          if (pipeNameToDrop.equals(event.getPipeName()) && regionId == 
event.getRegionId()) {
             
event.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
             return true;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 77fd73f8aba..9b787ee30f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -184,9 +184,9 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
         && 
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
   }
 
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
-    defaultBatch.discardEventsOfPipe(pipeNameToDrop);
-    endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(pipeNameToDrop));
+  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
+    defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId);
+    endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(pipeNameToDrop, regionId));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index f68d86dd7e8..b266d8cca3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -511,14 +511,15 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   //////////////////////////// Operations for close 
////////////////////////////
 
   @Override
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
     if (isTabletBatchModeEnabled) {
-      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+      tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
     }
     retryEventQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
+              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
+              && regionId == ((EnrichedEvent) event).getRegionId()) {
             ((EnrichedEvent) event)
                 
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
             return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 772a55b0e17..f67a2b88dd6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -493,8 +493,8 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
   }
 
   @Override
-  public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
-    tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+  public synchronized void discardEventsOfPipe(final String pipeNameToDrop, 
final int regionId) {
+    tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index d55f18f9838..7c88c962466 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -218,9 +218,9 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
    * its queued events in the output pipe connector.
    */
-  public void discardEventsOfPipe(final String pipeNameToDrop) {
+  public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
     // Try to remove the events as much as possible
-    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop);
+    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
 
     // synchronized to use the lastEvent & lastExceptionEvent
     synchronized (this) {
@@ -230,7 +230,8 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
       // Note that since we use a new thread to stop all the pipes, we will 
not encounter deadlock
       // here. Or else we will.
       if (lastEvent instanceof EnrichedEvent
-          && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())) 
{
+          && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
+          && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
         // Do not clear last event's reference count because it may be on 
transferring
         lastEvent = null;
         // Submit self to avoid that the lastEvent has been retried "max 
times" times and has
@@ -252,13 +253,14 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
       // clear the lastExceptionEvent. It's safe to potentially clear it twice 
because we have the
       // "nonnull" detection.
       if (lastExceptionEvent instanceof EnrichedEvent
-          && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())) {
+          && pipeNameToDrop.equals(((EnrichedEvent) 
lastExceptionEvent).getPipeName())
+          && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
         clearReferenceCountAndReleaseLastExceptionEvent();
       }
     }
 
     if (outputPipeConnector instanceof IoTDBConnector) {
-      ((IoTDBConnector) 
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
+      ((IoTDBConnector) 
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop, regionId);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index e3d4d35a171..26c18b3dad6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -85,16 +85,17 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
    * connector scheduling.
    *
    * @param pipeNameToDeregister pipe name
+   * @param regionId region id
    * @return {@code true} if the {@link PipeConnectorSubtask} is out of life 
cycle, indicating that
    *     the {@link PipeConnectorSubtask} should never be used again
    * @throws IllegalStateException if {@link 
PipeConnectorSubtaskLifeCycle#registeredTaskCount} <= 0
    */
-  public synchronized boolean deregister(final String pipeNameToDeregister) {
+  public synchronized boolean deregister(final String pipeNameToDeregister, 
int regionId) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
 
-    subtask.discardEventsOfPipe(pipeNameToDeregister);
+    subtask.discardEventsOfPipe(pipeNameToDeregister, regionId);
 
     try {
       if (registeredTaskCount > 1) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index f8809b6b689..5bb14073648 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -179,7 +179,7 @@ public class PipeConnectorSubtaskManager {
   public synchronized void deregister(
       final String pipeName,
       final long creationTime,
-      final int dataRegionId,
+      final int regionId,
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
@@ -187,13 +187,13 @@ public class PipeConnectorSubtaskManager {
 
     final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
-    lifeCycles.removeIf(o -> o.deregister(pipeName));
+    lifeCycles.removeIf(o -> o.deregister(pipeName, regionId));
 
     if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
 
-    PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
dataRegionId);
+    PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
regionId);
   }
 
   public synchronized void start(final String attributeSortedString) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index 6b33c248bfb..beabba3128a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -168,12 +168,13 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   }
 
   @Override
-  public void discardEventsOfPipe(final String pipeNameToDrop) {
-    super.discardEventsOfPipe(pipeNameToDrop);
+  public void discardEventsOfPipe(final String pipeNameToDrop, final int 
regionId) {
+    super.discardEventsOfPipe(pipeNameToDrop, regionId);
     tsfileInsertEventDeque.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
+              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
+              && regionId == ((EnrichedEvent) event).getRegionId()) {
             if (((EnrichedEvent) event)
                 
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index c05bd13f07c..7e7f5fa1d21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -69,7 +69,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
   }
 
   @Override
-  public synchronized boolean deregister(final String ignored) {
+  public synchronized boolean deregister(final String ignored, int regionId) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index 45d62174e4b..377790836fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -164,7 +164,7 @@ public class SubscriptionConnectorSubtaskManager {
   public synchronized void deregister(
       final String pipeName,
       final long creationTime,
-      final int dataRegionId,
+      final int regionId,
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
@@ -172,11 +172,11 @@ public class SubscriptionConnectorSubtaskManager {
 
     final PipeConnectorSubtaskLifeCycle lifeCycle =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
-    if (lifeCycle.deregister(pipeName)) {
+    if (lifeCycle.deregister(pipeName, regionId)) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
 
-    PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
dataRegionId);
+    PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
regionId);
   }
 
   public synchronized void start(final String attributeSortedString) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index eb66a638672..e1f8f9ed1d5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -476,7 +476,7 @@ public abstract class IoTDBConnector implements 
PipeConnector {
    * When a pipe is dropped, the connector maybe reused and will not be 
closed. We need to discard
    * its batched or queued events in the output pipe connector.
    */
-  public synchronized void discardEventsOfPipe(final String pipeName) {
+  public synchronized void discardEventsOfPipe(final String pipeName, final 
int regionId) {
     // Do nothing by default
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 4429d4d8b92..7c7d6fe9ea2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -292,6 +292,11 @@ public abstract class EnrichedEvent implements Event {
     return creationTime;
   }
 
+  public final int getRegionId() {
+    // TODO: persist regionId in EnrichedEvent
+    return committerKey == null ? -1 : committerKey.getRegionId();
+  }
+
   public final boolean isDataRegionEvent() {
     return !(this instanceof PipeWritePlanEvent) && !(this instanceof 
PipeSnapshotEvent);
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
index c67e80181f9..e981818b87e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
@@ -124,11 +124,12 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     eventCounter.reset();
   }
 
-  public void discardEventsOfPipe(final String pipeNameToDrop) {
+  public void discardEventsOfPipe(final String pipeNameToDrop, final int 
regionId) {
     pendingQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
+              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
+              && regionId == ((EnrichedEvent) event).getRegionId()) {
             if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
             }

Reply via email to