This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c8419ec7fc9 Pipe: Ensure at-least-once semantic by forcibly starting
PipeHistoricalDataRegionTsFileExtractor after pipe restart (#12010)
c8419ec7fc9 is described below
commit c8419ec7fc9e9e236ba77df31b9ad6f8c7262552
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 1 18:16:59 2024 +0800
Pipe: Ensure at-least-once semantic by forcibly starting
PipeHistoricalDataRegionTsFileExtractor after pipe restart (#12010)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 12 ++++++---
.../pipe/task/builder/PipeDataNodeTaskBuilder.java | 31 +++++++++++++++++++---
.../pipe/config/constant/SystemConstant.java | 30 +++++++++++++++++++++
3 files changed, 67 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 88c7fbcb6fd..d4f3f98e10c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor.historical;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -125,12 +126,17 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
}
- // User may set the EXTRACTOR_HISTORY_START_TIME and
EXTRACTOR_HISTORY_END_TIME without
+ // Historical data extraction is enabled in the following cases:
+ // 1. System restarts the pipe. If the pipe is restarted but historical
data extraction is not
+ // enabled, the pipe will lose some historical data.
+ // 2. User may set the EXTRACTOR_HISTORY_START_TIME and
EXTRACTOR_HISTORY_END_TIME without
// enabling the historical data extraction, which may affect the realtime
data extraction.
isHistoricalExtractorEnabled =
parameters.getBooleanOrDefault(
- Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
- EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+ SystemConstant.RESTART_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
+ || parameters.getBooleanOrDefault(
+ Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
+ EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
try {
historicalDataExtractionStartTime =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index a47c74ab5ba..4d5794e80de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.task.builder;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
@@ -28,6 +30,10 @@ import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskExtractorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import java.util.HashMap;
+import java.util.Map;
public abstract class PipeDataNodeTaskBuilder {
@@ -38,6 +44,8 @@ public abstract class PipeDataNodeTaskBuilder {
protected final PipeProcessorSubtaskExecutor processorExecutor;
protected final PipeConnectorSubtaskExecutor connectorExecutor;
+ protected final Map<String, String> systemParameters;
+
protected PipeDataNodeTaskBuilder(
PipeStaticMeta pipeStaticMeta,
TConsensusGroupId regionId,
@@ -49,6 +57,7 @@ public abstract class PipeDataNodeTaskBuilder {
this.pipeTaskMeta = pipeTaskMeta;
this.processorExecutor = processorExecutor;
this.connectorExecutor = connectorExecutor;
+ systemParameters = generateSystemParameters();
}
public PipeDataNodeTask build() {
@@ -59,7 +68,7 @@ public abstract class PipeDataNodeTaskBuilder {
new PipeTaskExtractorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- pipeStaticMeta.getExtractorParameters(),
+
blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters()),
regionId,
pipeTaskMeta);
@@ -67,7 +76,7 @@ public abstract class PipeDataNodeTaskBuilder {
new PipeTaskConnectorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- pipeStaticMeta.getConnectorParameters(),
+
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
regionId,
connectorExecutor);
@@ -76,7 +85,7 @@ public abstract class PipeDataNodeTaskBuilder {
new PipeTaskProcessorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
- pipeStaticMeta.getProcessorParameters(),
+
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
regionId,
extractorStage.getEventSupplier(),
connectorStage.getPipeConnectorPendingQueue(),
@@ -85,4 +94,20 @@ public abstract class PipeDataNodeTaskBuilder {
return new PipeDataNodeTask(
pipeStaticMeta.getPipeName(), regionId, extractorStage,
processorStage, connectorStage);
}
+
+ private Map<String, String> generateSystemParameters() {
+ final Map<String, String> systemParameters = new HashMap<>();
+ if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)) {
+ systemParameters.put(SystemConstant.RESTART_KEY,
Boolean.TRUE.toString());
+ }
+ return systemParameters;
+ }
+
+ private PipeParameters blendUserAndSystemParameters(PipeParameters
userParameters) {
+ // Deep copy the user parameters to avoid modification of the original
parameters.
+ // If the original parameters are modified, progress index report will be
affected.
+ final Map<String, String> blendedParameters = new
HashMap<>(userParameters.getAttribute());
+ blendedParameters.putAll(systemParameters);
+ return new PipeParameters(blendedParameters);
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
new file mode 100644
index 00000000000..c77c56fb9c0
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.config.constant;
+
+public class SystemConstant {
+
+ public static final String RESTART_KEY = "__system.restart";
+ public static final boolean RESTART_DEFAULT_VALUE = false;
+
+ private SystemConstant() {
+ throw new IllegalStateException("Utility class");
+ }
+}