This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 2ee8e15f2b8 Pipe: Added skip-if semantic to iotdb-connector (#15530) 
(#15553)
2ee8e15f2b8 is described below

commit 2ee8e15f2b868ae420a36bea1c15188651feb598
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 22 21:49:03 2025 +0800

    Pipe: Added skip-if semantic to iotdb-connector (#15530) (#15553)
---
 .../config/constant/PipeConnectorConstant.java     |  5 +++
 .../pipe/connector/protocol/IoTDBConnector.java    | 27 ++++++++++++-
 .../pipe/receiver/PipeReceiverStatusHandler.java   | 45 +++++++++++++++++++++-
 3 files changed, 75 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 8a04d133d60..f570319a08e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -250,6 +250,11 @@ public class PipeConnectorConstant {
   public static final String SINK_MARK_AS_PIPE_REQUEST_KEY = 
"sink.mark-as-pipe-request";
   public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE = 
true;
 
+  public static final String CONNECTOR_SKIP_IF_KEY = "connector.skipif";
+  public static final String SINK_SKIP_IF_KEY = "sink.skipif";
+  public static final String CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES = 
"no-privileges";
+  public static final String IOTDB_CONNECTOR_SKIP_IF_DEFAULT_VALUE = "";
+
   public static final String CONNECTOR_OPC_DA_CLSID_KEY = 
"connector.opcda.clsid";
   public static final String SINK_OPC_DA_CLSID_KEY = "sink.opcda.clsid";
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 1a5bb373f42..ea450095a84 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -52,6 +52,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
@@ -88,6 +89,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
@@ -103,6 +105,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SKIP_IF_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.IOTDB_CONNECTOR_SKIP_IF_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_ZSTD_LEVEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
@@ -128,6 +132,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_SKIP_IF_KEY;
 
 public abstract class IoTDBConnector implements PipeConnector {
 
@@ -380,6 +385,25 @@ public abstract class IoTDBConnector implements 
PipeConnector {
             CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
     LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
 
+    final String connectorSkipIfValue =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_SKIP_IF_KEY, SINK_SKIP_IF_KEY),
+                IOTDB_CONNECTOR_SKIP_IF_DEFAULT_VALUE)
+            .trim();
+    final Set<String> skipIfOptionSet =
+        Arrays.stream(connectorSkipIfValue.split(","))
+            .map(String::trim)
+            .filter(s -> !s.isEmpty())
+            .map(String::toLowerCase)
+            .collect(Collectors.toSet());
+    boolean skipIfNoPrivileges = 
skipIfOptionSet.remove(CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES);
+    if (!skipIfOptionSet.isEmpty()) {
+      throw new PipeParameterNotValidException(
+          String.format("Parameters in set %s are not allowed in 'skipif'", 
skipIfOptionSet));
+    }
+    LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges);
+
     receiverStatusHandler =
         new PipeReceiverStatusHandler(
             parameters
@@ -409,7 +433,8 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                 Arrays.asList(
                     CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY,
                     SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY),
-                CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
+                CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE),
+            skipIfNoPrivileges);
     shouldReceiverConvertOnTypeMismatch =
         parameters.getBooleanOrDefault(
             Arrays.asList(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 0f87d361bb0..040b1a795d2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -50,6 +50,7 @@ public class PipeReceiverStatusHandler {
 
   private final long retryMaxMillisWhenOtherExceptionsOccur;
   private final boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur;
+  private final boolean skipIfNoPrivileges;
 
   private final AtomicLong exceptionFirstEncounteredTime = new AtomicLong(0);
   private final AtomicBoolean exceptionEventHasBeenRetried = new 
AtomicBoolean(false);
@@ -60,7 +61,8 @@ public class PipeReceiverStatusHandler {
       final long retryMaxSecondsWhenConflictOccurs,
       final boolean shouldRecordIgnoredDataWhenConflictOccurs,
       final long retryMaxSecondsWhenOtherExceptionsOccur,
-      final boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur) {
+      final boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur,
+      final boolean skipIfNoPrivileges) {
     this.isRetryAllowedWhenConflictOccurs = isRetryAllowedWhenConflictOccurs;
     this.retryMaxMillisWhenConflictOccurs =
         retryMaxSecondsWhenConflictOccurs < 0
@@ -74,6 +76,7 @@ public class PipeReceiverStatusHandler {
             : retryMaxSecondsWhenOtherExceptionsOccur * 1000;
     this.shouldRecordIgnoredDataWhenOtherExceptionsOccur =
         shouldRecordIgnoredDataWhenOtherExceptionsOccur;
+    this.skipIfNoPrivileges = skipIfNoPrivileges;
   }
 
   /**
@@ -152,6 +155,46 @@ public class PipeReceiverStatusHandler {
                       Math.min(CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenConflictOccurs * 1.1)));
         }
 
+      case 803: // NO_PERMISSION
+        if (skipIfNoPrivileges) {
+          return;
+        }
+
+        synchronized (this) {
+          recordExceptionStatusIfNecessary(recordMessage);
+
+          if (exceptionEventHasBeenRetried.get()
+              && System.currentTimeMillis() - 
exceptionFirstEncounteredTime.get()
+                  > retryMaxMillisWhenOtherExceptionsOccur) {
+            LOGGER.warn(
+                "No permission: retry timeout. will be ignored. event: {}. 
status: {}",
+                shouldRecordIgnoredDataWhenOtherExceptionsOccur ? 
recordMessage : "not recorded",
+                status);
+            resetExceptionStatus();
+            return;
+          }
+
+          LOGGER.warn(
+              "No permission: will retry {}. status: {}",
+              retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE
+                  ? "forever"
+                  : "for at least "
+                      + (retryMaxMillisWhenOtherExceptionsOccur
+                              + exceptionFirstEncounteredTime.get()
+                              - System.currentTimeMillis())
+                          / 1000.0
+                      + " seconds",
+              status);
+          exceptionEventHasBeenRetried.set(true);
+          throw new PipeRuntimeConnectorRetryTimesConfigurableException(
+              exceptionMessage,
+              (int)
+                  Math.max(
+                      PipeSubtask.MAX_RETRY_TIMES,
+                      Math.min(
+                          CONFLICT_RETRY_MAX_TIMES, 
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+        }
+
       default: // Other exceptions
         synchronized (this) {
           recordExceptionStatusIfNecessary(recordMessage);

Reply via email to