This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 14057a3cd93 Pipe: Add 'data.delete' & 'realtime-first' conflict check 
in PipeParameters (#13433)
14057a3cd93 is described below

commit 14057a3cd9368f8de644a1ecc7062a48c13823d7
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Sep 13 12:56:46 2024 +0800

    Pipe: Add 'data.delete' & 'realtime-first' conflict check in PipeParameters 
(#13433)
---
 .../api/customizer/parameter/PipeParameters.java   |  4 ++
 .../pipe/task/builder/PipeDataNodeTaskBuilder.java | 57 ++++++++++++++++++++--
 2 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 1646423b133..4d0f75790dc 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -71,6 +71,10 @@ public class PipeParameters {
     return false;
   }
 
+  public void addAttribute(final String key, String values) {
+    attributes.put(KeyReducer.reduce(key), values);
+  }
+
   public String getString(final String key) {
     final String value = attributes.get(key);
     return value != null ? value : attributes.get(KeyReducer.reduce(key));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
index 31f46e73e3a..85ea6980212 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/builder/PipeDataNodeTaskBuilder.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.task.builder;
 
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -27,6 +29,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeType;
 import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.PipeSubtaskExecutorManager;
+import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskExtractorStage;
@@ -34,6 +37,10 @@ import 
org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
 import 
org.apache.iotdb.db.subscription.task.stage.SubscriptionTaskConnectorStage;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -46,6 +53,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 
 public class PipeDataNodeTaskBuilder {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataNodeTaskBuilder.class);
+
   private final PipeStaticMeta pipeStaticMeta;
   private final int regionId;
   private final PipeTaskMeta pipeTaskMeta;
@@ -77,23 +86,31 @@ public class PipeDataNodeTaskBuilder {
   public PipeDataNodeTask build() {
     // Event flow: extractor -> processor -> connector
 
+    // Analyzes the PipeParameters to identify potential conflicts.
+    final PipeParameters extractorParameters =
+        blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters());
+    final PipeParameters connectorParameters =
+        blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters());
+    checkConflict(extractorParameters, connectorParameters);
+
     // We first build the extractor and connector, then build the processor.
     final PipeTaskExtractorStage extractorStage =
         new PipeTaskExtractorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            
blendUserAndSystemParameters(pipeStaticMeta.getExtractorParameters()),
+            extractorParameters,
             regionId,
             pipeTaskMeta);
 
     final PipeTaskConnectorStage connectorStage;
     final PipeType pipeType = pipeStaticMeta.getPipeType();
+
     if (PipeType.SUBSCRIPTION.equals(pipeType)) {
       connectorStage =
           new SubscriptionTaskConnectorStage(
               pipeStaticMeta.getPipeName(),
               pipeStaticMeta.getCreationTime(),
-              
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
+              connectorParameters,
               regionId,
               CONNECTOR_EXECUTOR_MAP.get(pipeType));
     } else { // user pipe or consensus pipe
@@ -101,7 +118,7 @@ public class PipeDataNodeTaskBuilder {
           new PipeTaskConnectorStage(
               pipeStaticMeta.getPipeName(),
               pipeStaticMeta.getCreationTime(),
-              
blendUserAndSystemParameters(pipeStaticMeta.getConnectorParameters()),
+              connectorParameters,
               regionId,
               CONNECTOR_EXECUTOR_MAP.get(pipeType));
     }
@@ -141,4 +158,38 @@ public class PipeDataNodeTaskBuilder {
     blendedParameters.putAll(systemParameters);
     return new PipeParameters(blendedParameters);
   }
+
+  private void checkConflict(
+      final PipeParameters extractorParameters, final PipeParameters 
connectorParameters) {
+
+    try {
+      final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
+          
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(extractorParameters);
+      if (!insertionDeletionListeningOptionPair.right) {
+        return;
+      }
+    } catch (IllegalPathException e) {
+      LOGGER.warn(
+          "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' 
parameters: {}",
+          e.getMessage(),
+          e);
+      return;
+    }
+
+    final Boolean isRealtime =
+        connectorParameters.getBooleanByKeys(
+            PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
+            PipeConnectorConstant.SINK_REALTIME_FIRST_KEY);
+    if (isRealtime == null) {
+      
connectorParameters.addAttribute(PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
 "false");
+      LOGGER.info(
+          "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 
'realtime-first' is defaulted to 'false' to prevent sync issues after 
deletion.");
+      return;
+    }
+
+    if (isRealtime) {
+      LOGGER.warn(
+          "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 
'realtime-first' set to 'true' may result in data synchronization issues after 
deletion.");
+    }
+  }
 }

Reply via email to