This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 83116396ae7 PipeConsensus: Fix circular replicate in kill -9/restart 
cases (#12937)
83116396ae7 is described below

commit 83116396ae76c0040f064c53f1da3d3fbec90323
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Jul 18 01:13:42 2024 -0500

    PipeConsensus: Fix circular replicate in kill -9/restart cases (#12937)
---
 .../db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java |  7 +++++++
 .../pipe/processor/pipeconsensus/PipeConsensusProcessor.java  |  7 ++++++-
 .../protocol/pipeconsensus/PipeConsensusReceiver.java         |  1 +
 .../db/storageengine/dataregion/tsfile/TsFileResource.java    | 11 +++++++++++
 4 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index ae53e72b8e1..4194670493e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -62,6 +62,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
   private final boolean isLoaded;
   private final boolean isGeneratedByPipe;
+  private final boolean isGeneratedByPipeConsensus;
 
   private final AtomicBoolean isClosed;
   private TsFileInsertionDataContainer dataContainer;
@@ -108,6 +109,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
     this.isLoaded = isLoaded;
     this.isGeneratedByPipe = isGeneratedByPipe;
+    this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus();
 
     isClosed = new AtomicBoolean(resource.isClosed());
     // Register close listener if TsFile is not closed
@@ -363,6 +365,11 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     }
   }
 
+  /** The method is used to prevent circular replication in PipeConsensus */
+  public boolean isGeneratedByPipeConsensus() {
+    return isGeneratedByPipeConsensus;
+  }
+
   private TsFileInsertionDataContainer initDataContainer() {
     try {
       if (dataContainer == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java
index ee2bfc5ce5d..bbb02084514 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
@@ -69,7 +70,10 @@ public class PipeConsensusProcessor implements PipeProcessor 
{
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
       throws Exception {
-    if (tsFileInsertionEvent instanceof EnrichedEvent) {
+    // Only user-generated TsFileInsertionEvent can be replicated. Any tsFile 
synchronized from a
+    // replica should not be replicated again
+    if (tsFileInsertionEvent instanceof EnrichedEvent
+        && !((PipeTsFileInsertionEvent) 
tsFileInsertionEvent).isGeneratedByPipeConsensus()) {
       final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent;
       if (isContainLocalData(enrichedEvent)) {
         eventCollector.collect(tsFileInsertionEvent);
@@ -80,6 +84,7 @@ public class PipeConsensusProcessor implements PipeProcessor {
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
       throws Exception {
+    // Only user-generated TabletInsertionEvent can be replicated.
     if (tabletInsertionEvent instanceof EnrichedEvent) {
       final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent;
       if (isContainLocalData(enrichedEvent)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index d623e5bbe6e..ce289ec341a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -682,6 +682,7 @@ public class PipeConsensusReceiver {
 
     tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
     tsFileResource.setProgressIndex(progressIndex);
+    tsFileResource.setGeneratedByPipeConsensus(true);
     tsFileResource.serialize();
     return tsFileResource;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 51c7d482007..80335bab9cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -161,6 +161,9 @@ public class TsFileResource {
 
   private ProgressIndex maxProgressIndex;
 
+  /** used to prevent circular replication in PipeConsensus */
+  private boolean isGeneratedByPipeConsensus = false;
+
   private InsertionCompactionCandidateStatus 
insertionCompactionCandidateStatus =
       InsertionCompactionCandidateStatus.NOT_CHECKED;
 
@@ -510,6 +513,14 @@ public class TsFileResource {
     return processor;
   }
 
+  public boolean isGeneratedByPipeConsensus() {
+    return isGeneratedByPipeConsensus;
+  }
+
+  public void setGeneratedByPipeConsensus(boolean generatedByPipeConsensus) {
+    isGeneratedByPipeConsensus = generatedByPipeConsensus;
+  }
+
   public void writeLock() {
     if (originTsFileResource == null) {
       tsFileLock.writeLock();

Reply via email to