This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 83116396ae7 PipeConsensus: Fix circular replicate in kill -9/restart
cases (#12937)
83116396ae7 is described below
commit 83116396ae76c0040f064c53f1da3d3fbec90323
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Jul 18 01:13:42 2024 -0500
PipeConsensus: Fix circular replicate in kill -9/restart cases (#12937)
---
.../db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java | 7 +++++++
.../pipe/processor/pipeconsensus/PipeConsensusProcessor.java | 7 ++++++-
.../protocol/pipeconsensus/PipeConsensusReceiver.java | 1 +
.../db/storageengine/dataregion/tsfile/TsFileResource.java | 11 +++++++++++
4 files changed, 25 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index ae53e72b8e1..4194670493e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -62,6 +62,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private final boolean isLoaded;
private final boolean isGeneratedByPipe;
+ private final boolean isGeneratedByPipeConsensus;
private final AtomicBoolean isClosed;
private TsFileInsertionDataContainer dataContainer;
@@ -108,6 +109,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
this.isLoaded = isLoaded;
this.isGeneratedByPipe = isGeneratedByPipe;
+ this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus();
isClosed = new AtomicBoolean(resource.isClosed());
// Register close listener if TsFile is not closed
@@ -363,6 +365,11 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
}
+ /** The method is used to prevent circular replication in PipeConsensus */
+ public boolean isGeneratedByPipeConsensus() {
+ return isGeneratedByPipeConsensus;
+ }
+
private TsFileInsertionDataContainer initDataContainer() {
try {
if (dataContainer == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java
index ee2bfc5ce5d..bbb02084514 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
@@ -69,7 +70,10 @@ public class PipeConsensusProcessor implements PipeProcessor
{
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent,
EventCollector eventCollector)
throws Exception {
- if (tsFileInsertionEvent instanceof EnrichedEvent) {
+ // Only user-generated TsFileInsertionEvent can be replicated. Any tsFile
synchronized from a
+ // replica should not be replicated again
+ if (tsFileInsertionEvent instanceof EnrichedEvent
+ && !((PipeTsFileInsertionEvent)
tsFileInsertionEvent).isGeneratedByPipeConsensus()) {
final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent;
if (isContainLocalData(enrichedEvent)) {
eventCollector.collect(tsFileInsertionEvent);
@@ -80,6 +84,7 @@ public class PipeConsensusProcessor implements PipeProcessor {
@Override
public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
throws Exception {
+ // Only user-generated TabletInsertionEvent can be replicated.
if (tabletInsertionEvent instanceof EnrichedEvent) {
final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent;
if (isContainLocalData(enrichedEvent)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index d623e5bbe6e..ce289ec341a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -682,6 +682,7 @@ public class PipeConsensusReceiver {
tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
tsFileResource.setProgressIndex(progressIndex);
+ tsFileResource.setGeneratedByPipeConsensus(true);
tsFileResource.serialize();
return tsFileResource;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 51c7d482007..80335bab9cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -161,6 +161,9 @@ public class TsFileResource {
private ProgressIndex maxProgressIndex;
+ /** used to prevent circular replication in PipeConsensus */
+ private boolean isGeneratedByPipeConsensus = false;
+
private InsertionCompactionCandidateStatus
insertionCompactionCandidateStatus =
InsertionCompactionCandidateStatus.NOT_CHECKED;
@@ -510,6 +513,14 @@ public class TsFileResource {
return processor;
}
+ public boolean isGeneratedByPipeConsensus() {
+ return isGeneratedByPipeConsensus;
+ }
+
+ public void setGeneratedByPipeConsensus(boolean generatedByPipeConsensus) {
+ isGeneratedByPipeConsensus = generatedByPipeConsensus;
+ }
+
public void writeLock() {
if (originTsFileResource == null) {
tsFileLock.writeLock();