This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c8419ec7fc9 Pipe: Ensure at-least-once semantic by forcibly starting 
PipeHistoricalDataRegionTsFileExtractor after pipe restart (#12010)
c8419ec7fc9 is described below

commit c8419ec7fc9e9e236ba77df31b9ad6f8c7262552
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 1 18:16:59 2024 +0800

    Pipe: Ensure at-least-once semantic by forcibly starting 
PipeHistoricalDataRegionTsFileExtractor after pipe restart (#12010)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 12 ++++++---
 .../pipe/task/builder/PipeDataNodeTaskBuilder.java | 31 +++++++++++++++++++---
 .../pipe/config/constant/SystemConstant.java       | 30 +++++++++++++++++++++
 3 files changed, 67 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 88c7fbcb6fd..d4f3f98e10c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor.historical;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -125,12 +126,17 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       }
     }
 
-    // User may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
+    // Historical data extraction is enabled in the following cases:
+    // 1. System restarts the pipe. If the pipe is restarted but historical 
data extraction is not
+    // enabled, the pipe will lose some historical data.
+    // 2. User may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
     // enabling the historical data extraction, which may affect the realtime 
data extraction.
     isHistoricalExtractorEnabled =
         parameters.getBooleanOrDefault(
-            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
-            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+                SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
+            || parameters.getBooleanOrDefault(
+                Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+                EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
 
     try {
       historicalDataExtractionStartTime =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index a47c74ab5ba..4d5794e80de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.task.builder;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
@@ -28,6 +30,10 @@ import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskExtractorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import java.util.HashMap;
+import java.util.Map;
 
 public abstract class PipeDataNodeTaskBuilder {
 
@@ -38,6 +44,8 @@ public abstract class PipeDataNodeTaskBuilder {
   protected final PipeProcessorSubtaskExecutor processorExecutor;
   protected final PipeConnectorSubtaskExecutor connectorExecutor;
 
+  protected final Map<String, String> systemParameters;
+
   protected PipeDataNodeTaskBuilder(
       PipeStaticMeta pipeStaticMeta,
       TConsensusGroupId regionId,
@@ -49,6 +57,7 @@ public abstract class PipeDataNodeTaskBuilder {
     this.pipeTaskMeta = pipeTaskMeta;
     this.processorExecutor = processorExecutor;
     this.connectorExecutor = connectorExecutor;
+    systemParameters = generateSystemParameters();
   }
 
   public PipeDataNodeTask build() {
@@ -59,7 +68,7 @@ public abstract class PipeDataNodeTaskBuilder {
         new PipeTaskExtractorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            pipeStaticMeta.getExtractorParameters(),
+            
blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters()),
             regionId,
             pipeTaskMeta);
 
@@ -67,7 +76,7 @@ public abstract class PipeDataNodeTaskBuilder {
         new PipeTaskConnectorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            pipeStaticMeta.getConnectorParameters(),
+            
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
             regionId,
             connectorExecutor);
 
@@ -76,7 +85,7 @@ public abstract class PipeDataNodeTaskBuilder {
         new PipeTaskProcessorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            pipeStaticMeta.getProcessorParameters(),
+            
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
             regionId,
             extractorStage.getEventSupplier(),
             connectorStage.getPipeConnectorPendingQueue(),
@@ -85,4 +94,20 @@ public abstract class PipeDataNodeTaskBuilder {
     return new PipeDataNodeTask(
         pipeStaticMeta.getPipeName(), regionId, extractorStage, 
processorStage, connectorStage);
   }
+
+  private Map<String, String> generateSystemParameters() {
+    final Map<String, String> systemParameters = new HashMap<>();
+    if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)) {
+      systemParameters.put(SystemConstant.RESTART_KEY, 
Boolean.TRUE.toString());
+    }
+    return systemParameters;
+  }
+
+  private PipeParameters blendUserAndSystemParameters(PipeParameters 
userParameters) {
+    // Deep copy the user parameters to avoid modification of the original 
parameters.
+    // If the original parameters are modified, progress index report will be 
affected.
+    final Map<String, String> blendedParameters = new 
HashMap<>(userParameters.getAttribute());
+    blendedParameters.putAll(systemParameters);
+    return new PipeParameters(blendedParameters);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
new file mode 100644
index 00000000000..c77c56fb9c0
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.config.constant;
+
+public class SystemConstant {
+
+  public static final String RESTART_KEY = "__system.restart";
+  public static final boolean RESTART_DEFAULT_VALUE = false;
+
+  private SystemConstant() {
+    throw new IllegalStateException("Utility class");
+  }
+}

Reply via email to