This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2ee8e15f2b8 Pipe: Added skip-if semantic to iotdb-connector (#15530)
(#15553)
2ee8e15f2b8 is described below
commit 2ee8e15f2b868ae420a36bea1c15188651feb598
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 22 21:49:03 2025 +0800
Pipe: Added skip-if semantic to iotdb-connector (#15530) (#15553)
---
.../config/constant/PipeConnectorConstant.java | 5 +++
.../pipe/connector/protocol/IoTDBConnector.java | 27 ++++++++++++-
.../pipe/receiver/PipeReceiverStatusHandler.java | 45 +++++++++++++++++++++-
3 files changed, 75 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 8a04d133d60..f570319a08e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -250,6 +250,11 @@ public class PipeConnectorConstant {
public static final String SINK_MARK_AS_PIPE_REQUEST_KEY =
"sink.mark-as-pipe-request";
public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE =
true;
+ public static final String CONNECTOR_SKIP_IF_KEY = "connector.skipif";
+ public static final String SINK_SKIP_IF_KEY = "sink.skipif";
+ public static final String CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES =
"no-privileges";
+ public static final String IOTDB_CONNECTOR_SKIP_IF_DEFAULT_VALUE = "";
+
public static final String CONNECTOR_OPC_DA_CLSID_KEY =
"connector.opcda.clsid";
public static final String SINK_OPC_DA_CLSID_KEY = "sink.opcda.clsid";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 1a5bb373f42..ea450095a84 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -52,6 +52,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
@@ -88,6 +89,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
@@ -103,6 +105,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_SKIP_IF_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.IOTDB_CONNECTOR_SKIP_IF_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_ZSTD_LEVEL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
@@ -128,6 +132,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_SKIP_IF_KEY;
public abstract class IoTDBConnector implements PipeConnector {
@@ -380,6 +385,25 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
+ final String connectorSkipIfValue =
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_SKIP_IF_KEY, SINK_SKIP_IF_KEY),
+ IOTDB_CONNECTOR_SKIP_IF_DEFAULT_VALUE)
+ .trim();
+ final Set<String> skipIfOptionSet =
+ Arrays.stream(connectorSkipIfValue.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .map(String::toLowerCase)
+ .collect(Collectors.toSet());
+ boolean skipIfNoPrivileges =
skipIfOptionSet.remove(CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES);
+ if (!skipIfOptionSet.isEmpty()) {
+ throw new PipeParameterNotValidException(
+ String.format("Parameters in set %s are not allowed in 'skipif'",
skipIfOptionSet));
+ }
+ LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges);
+
receiverStatusHandler =
new PipeReceiverStatusHandler(
parameters
@@ -409,7 +433,8 @@ public abstract class IoTDBConnector implements
PipeConnector {
Arrays.asList(
CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY,
SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY),
- CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
+ CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE),
+ skipIfNoPrivileges);
shouldReceiverConvertOnTypeMismatch =
parameters.getBooleanOrDefault(
Arrays.asList(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 0f87d361bb0..040b1a795d2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -50,6 +50,7 @@ public class PipeReceiverStatusHandler {
private final long retryMaxMillisWhenOtherExceptionsOccur;
private final boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur;
+ private final boolean skipIfNoPrivileges;
private final AtomicLong exceptionFirstEncounteredTime = new AtomicLong(0);
private final AtomicBoolean exceptionEventHasBeenRetried = new
AtomicBoolean(false);
@@ -60,7 +61,8 @@ public class PipeReceiverStatusHandler {
final long retryMaxSecondsWhenConflictOccurs,
final boolean shouldRecordIgnoredDataWhenConflictOccurs,
final long retryMaxSecondsWhenOtherExceptionsOccur,
- final boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur) {
+ final boolean shouldRecordIgnoredDataWhenOtherExceptionsOccur,
+ final boolean skipIfNoPrivileges) {
this.isRetryAllowedWhenConflictOccurs = isRetryAllowedWhenConflictOccurs;
this.retryMaxMillisWhenConflictOccurs =
retryMaxSecondsWhenConflictOccurs < 0
@@ -74,6 +76,7 @@ public class PipeReceiverStatusHandler {
: retryMaxSecondsWhenOtherExceptionsOccur * 1000;
this.shouldRecordIgnoredDataWhenOtherExceptionsOccur =
shouldRecordIgnoredDataWhenOtherExceptionsOccur;
+ this.skipIfNoPrivileges = skipIfNoPrivileges;
}
/**
@@ -152,6 +155,46 @@ public class PipeReceiverStatusHandler {
Math.min(CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenConflictOccurs * 1.1)));
}
+ case 803: // NO_PERMISSION
+ if (skipIfNoPrivileges) {
+ return;
+ }
+
+ synchronized (this) {
+ recordExceptionStatusIfNecessary(recordMessage);
+
+ if (exceptionEventHasBeenRetried.get()
+ && System.currentTimeMillis() -
exceptionFirstEncounteredTime.get()
+ > retryMaxMillisWhenOtherExceptionsOccur) {
+ LOGGER.warn(
+ "No permission: retry timeout. will be ignored. event: {}.
status: {}",
+ shouldRecordIgnoredDataWhenOtherExceptionsOccur ?
recordMessage : "not recorded",
+ status);
+ resetExceptionStatus();
+ return;
+ }
+
+ LOGGER.warn(
+ "No permission: will retry {}. status: {}",
+ retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE
+ ? "forever"
+ : "for at least "
+ + (retryMaxMillisWhenOtherExceptionsOccur
+ + exceptionFirstEncounteredTime.get()
+ - System.currentTimeMillis())
+ / 1000.0
+ + " seconds",
+ status);
+ exceptionEventHasBeenRetried.set(true);
+ throw new PipeRuntimeConnectorRetryTimesConfigurableException(
+ exceptionMessage,
+ (int)
+ Math.max(
+ PipeSubtask.MAX_RETRY_TIMES,
+ Math.min(
+ CONFLICT_RETRY_MAX_TIMES,
retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
+ }
+
default: // Other exceptions
synchronized (this) {
recordExceptionStatusIfNecessary(recordMessage);