This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 14057a3cd93 Pipe: Add 'data.delete' & 'realtime-first' conflict check
in PipeParameters (#13433)
14057a3cd93 is described below
commit 14057a3cd9368f8de644a1ecc7062a48c13823d7
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Sep 13 12:56:46 2024 +0800
Pipe: Add 'data.delete' & 'realtime-first' conflict check in PipeParameters
(#13433)
---
.../api/customizer/parameter/PipeParameters.java | 4 ++
.../pipe/task/builder/PipeDataNodeTaskBuilder.java | 57 ++++++++++++++++++++--
2 files changed, 58 insertions(+), 3 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 1646423b133..4d0f75790dc 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -71,6 +71,10 @@ public class PipeParameters {
return false;
}
+ public void addAttribute(final String key, String values) {
+ attributes.put(KeyReducer.reduce(key), values);
+ }
+
public String getString(final String key) {
final String value = attributes.get(key);
return value != null ? value : attributes.get(KeyReducer.reduce(key));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index 31f46e73e3a..85ea6980212 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.task.builder;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -27,6 +29,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeType;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskExtractorStage;
@@ -34,6 +37,10 @@ import
org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
import
org.apache.iotdb.db.subscription.task.stage.SubscriptionTaskConnectorStage;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Arrays;
import java.util.EnumMap;
import java.util.HashMap;
@@ -46,6 +53,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
public class PipeDataNodeTaskBuilder {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataNodeTaskBuilder.class);
+
private final PipeStaticMeta pipeStaticMeta;
private final int regionId;
private final PipeTaskMeta pipeTaskMeta;
@@ -77,23 +86,31 @@ public class PipeDataNodeTaskBuilder {
public PipeDataNodeTask build() {
// Event flow: extractor -> processor -> connector
+ // Analyzes the PipeParameters to identify potential conflicts.
+ final PipeParameters extractorParameters =
+ blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters());
+ final PipeParameters connectorParameters =
+ blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters());
+ checkConflict(extractorParameters, connectorParameters);
+
// We first build the extractor and connector, then build the processor.
final PipeTaskExtractorStage extractorStage =
new PipeTaskExtractorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
-
blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters()),
+ extractorParameters,
regionId,
pipeTaskMeta);
final PipeTaskConnectorStage connectorStage;
final PipeType pipeType = pipeStaticMeta.getPipeType();
+
if (PipeType.SUBSCRIPTION.equals(pipeType)) {
connectorStage =
new SubscriptionTaskConnectorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
-
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
+ connectorParameters,
regionId,
CONNECTOR_EXECUTOR_MAP.get(pipeType));
} else { // user pipe or consensus pipe
@@ -101,7 +118,7 @@ public class PipeDataNodeTaskBuilder {
new PipeTaskConnectorStage(
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
-
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
+ connectorParameters,
regionId,
CONNECTOR_EXECUTOR_MAP.get(pipeType));
}
@@ -141,4 +158,38 @@ public class PipeDataNodeTaskBuilder {
blendedParameters.putAll(systemParameters);
return new PipeParameters(blendedParameters);
}
+
+ private void checkConflict(
+ final PipeParameters extractorParameters, final PipeParameters
connectorParameters) {
+
+ try {
+ final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
+
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(extractorParameters);
+ if (!insertionDeletionListeningOptionPair.right) {
+ return;
+ }
+ } catch (IllegalPathException e) {
+ LOGGER.warn(
+ "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion'
parameters: {}",
+ e.getMessage(),
+ e);
+ return;
+ }
+
+ final Boolean isRealtime =
+ connectorParameters.getBooleanByKeys(
+ PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
+ PipeConnectorConstant.SINK_REALTIME_FIRST_KEY);
+ if (isRealtime == null) {
+
connectorParameters.addAttribute(PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
"false");
+ LOGGER.info(
+ "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete',
'realtime-first' is defaulted to 'false' to prevent sync issues after
deletion.");
+ return;
+ }
+
+ if (isRealtime) {
+ LOGGER.warn(
+ "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete',
'realtime-first' set to 'true' may result in data synchronization issues after
deletion.");
+ }
+ }
}