This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch auth-fix-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/auth-fix-2 by this push:
new fead66e2cc7 Pipe: Fixed the check for no permission 2 (#16804)
fead66e2cc7 is described below
commit fead66e2cc7caad2d51bc7fdf0524d7c635d04eb
Author: Caideyipi <[email protected]>
AuthorDate: Wed Nov 26 15:01:16 2025 +0800
Pipe: Fixed the check for no permission 2 (#16804)
* edge
* sonar
* emperor-cloth
* emperor-patch
---
.../sink/protocol/IoTDBConfigRegionAirGapSink.java | 6 +-
.../pipe/sink/protocol/IoTDBConfigRegionSink.java | 6 +-
.../airgap/IoTDBSchemaRegionAirGapSink.java | 3 +-
.../thrift/sync/IoTDBSchemaRegionSink.java | 2 +-
.../pipe/receiver/PipeReceiverStatusHandler.java | 33 ++-
.../PipeReceiverStatusHandlerTest.java | 302 +++++++++++++++++++++
6 files changed, 341 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
index 97413d68beb..e1f66733518 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
@@ -183,7 +183,8 @@ public class IoTDBConfigRegionAirGapSink extends
IoTDBAirGapSink {
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
- pipeConfigRegionWritePlanEvent.toString());
+ pipeConfigRegionWritePlanEvent.toString(),
+ true);
}
}
@@ -239,7 +240,8 @@ public class IoTDBConfigRegionAirGapSink extends
IoTDBAirGapSink {
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
- pipeConfigRegionSnapshotEvent.toString());
+ pipeConfigRegionSnapshotEvent.toString(),
+ true);
} else {
LOGGER.info("Successfully transferred config region snapshot {}.",
snapshot);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index d3555e62dfa..1b508eb2f5c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -179,7 +179,8 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink
{
String.format(
"Transfer config region write plan %s error, result status %s.",
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), status),
- pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString());
+ pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString(),
+ true);
}
if (LOGGER.isDebugEnabled()) {
@@ -265,7 +266,8 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink
{
String.format(
"Seal config region snapshot file %s error, result status %s.",
snapshotFile, resp.getStatus()),
- snapshotFile.toString());
+ snapshotFile.toString(),
+ true);
}
LOGGER.info("Successfully transferred config region snapshot {}.",
snapshotFile);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index bcbf88d2385..8280709446a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -133,7 +133,8 @@ public class IoTDBSchemaRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
- pipeSchemaRegionSnapshotEvent.toString());
+ pipeSchemaRegionSnapshotEvent.toString(),
+ true);
} else {
LOGGER.info(
"Successfully transferred schema region snapshot {} and {}.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
index 94333e824d6..6abbb70dc51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
@@ -145,7 +145,7 @@ public class IoTDBSchemaRegionSink extends
IoTDBDataNodeSyncSink {
String.format(
"Seal file %s and %s error, result status %s.",
mTreeSnapshotFile, tagLogSnapshotFile, resp.getStatus()),
- snapshotEvent.toString());
+ snapshotEvent.toString(), true);
}
LOGGER.info("Successfully transferred file {} and {}.", mTreeSnapshotFile,
tagLogSnapshotFile);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 71be50163fe..3cf9a4e7f03 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -25,6 +25,8 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigur
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.RetryUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -42,7 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class PipeReceiverStatusHandler {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
+ private static Logger LOGGER =
LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
private static final String NO_PERMISSION = "No permission";
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified
exception";
private static final String NO_PERMISSION_STR = "No permissions for this
operation";
@@ -84,6 +86,11 @@ public class PipeReceiverStatusHandler {
this.skipIfNoPrivileges = skipIfNoPrivileges;
}
+ public void handle(
+ final TSStatus status, final String exceptionMessage, final String
recordMessage) {
+ handle(status, exceptionMessage, recordMessage, false);
+ }
+
/**
* Handle {@link TSStatus} returned by receiver. Do nothing if ignore the
{@link Event}, and throw
* exception if retry the {@link Event}. Upper class must ensure that the
method is invoked only
@@ -97,7 +104,7 @@ public class PipeReceiverStatusHandler {
* put any time-related info here
*/
public void handle(
- final TSStatus status, final String exceptionMessage, final String
recordMessage) {
+ final TSStatus status, final String exceptionMessage, final String
recordMessage, final boolean log4NoPrivileges) {
switch (status.getCode()) {
case 200: // SUCCESS_STATUS
case 400: // REDIRECTION_RECOMMEND
@@ -170,17 +177,28 @@ public class PipeReceiverStatusHandler {
case 803: // NO_PERMISSION
if (skipIfNoPrivileges) {
+ if (log4NoPrivileges && LOGGER.isWarnEnabled()) {
+ LOGGER.warn(
+ "{}: Skip if no privileges. will be ignored. event: {}.
status: {}",
+ getNoPermission(true),
+ shouldRecordIgnoredDataWhenOtherExceptionsOccur ?
recordMessage : "not recorded",
+ status);
+ }
return;
}
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
break;
- case 305:
- handleOtherExceptions(status, exceptionMessage, recordMessage, false);
- break;
default:
// Some auth error may be wrapped in other codes
if (exceptionMessage.contains(NO_PERMISSION_STR)) {
if (skipIfNoPrivileges) {
+ if (log4NoPrivileges && LOGGER.isWarnEnabled()) {
+ LOGGER.warn(
+ "{}: Skip if no privileges. will be ignored. event: {}.
status: {}",
+ getNoPermission(true),
+ shouldRecordIgnoredDataWhenOtherExceptionsOccur ?
recordMessage : "not recorded",
+ status);
+ }
return;
}
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
@@ -300,4 +318,9 @@ public class PipeReceiverStatusHandler {
resultStatus.setSubStatus(givenStatusList);
return resultStatus;
}
+
+ @TestOnly
+ public static void setLogger(final Logger logger) {
+ LOGGER = logger;
+ }
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java
new file mode 100644
index 00000000000..38d2e015165
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
+
+public class PipeReceiverStatusHandlerTest {
+ @Test
+ public void testAuthLogger() {
+ final PipeReceiverStatusHandler handler =
+ new PipeReceiverStatusHandler(
+
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE.equals("retry"),
+ CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE,
+ CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE,
+ CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE,
+ CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE,
+ true);
+ PipeReceiverStatusHandler.setLogger(
+ new Logger() {
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public boolean isTraceEnabled() {
+ return false;
+ }
+
+ @Override
+ public void trace(String msg) {}
+
+ @Override
+ public void trace(String format, Object arg) {}
+
+ @Override
+ public void trace(String format, Object arg1, Object arg2) {}
+
+ @Override
+ public void trace(String format, Object... arguments) {}
+
+ @Override
+ public void trace(String msg, Throwable t) {}
+
+ @Override
+ public boolean isTraceEnabled(Marker marker) {
+ return false;
+ }
+
+ @Override
+ public void trace(Marker marker, String msg) {}
+
+ @Override
+ public void trace(Marker marker, String format, Object arg) {}
+
+ @Override
+ public void trace(Marker marker, String format, Object arg1, Object
arg2) {}
+
+ @Override
+ public void trace(Marker marker, String format, Object... argArray)
{}
+
+ @Override
+ public void trace(Marker marker, String msg, Throwable t) {}
+
+ @Override
+ public boolean isDebugEnabled() {
+ return false;
+ }
+
+ @Override
+ public void debug(String msg) {}
+
+ @Override
+ public void debug(String format, Object arg) {}
+
+ @Override
+ public void debug(String format, Object arg1, Object arg2) {}
+
+ @Override
+ public void debug(String format, Object... arguments) {}
+
+ @Override
+ public void debug(String msg, Throwable t) {}
+
+ @Override
+ public boolean isDebugEnabled(Marker marker) {
+ return false;
+ }
+
+ @Override
+ public void debug(Marker marker, String msg) {}
+
+ @Override
+ public void debug(Marker marker, String format, Object arg) {}
+
+ @Override
+ public void debug(Marker marker, String format, Object arg1, Object
arg2) {}
+
+ @Override
+ public void debug(Marker marker, String format, Object... arguments)
{}
+
+ @Override
+ public void debug(Marker marker, String msg, Throwable t) {}
+
+ @Override
+ public boolean isInfoEnabled() {
+ return false;
+ }
+
+ @Override
+ public void info(String msg) {}
+
+ @Override
+ public void info(String format, Object arg) {}
+
+ @Override
+ public void info(String format, Object arg1, Object arg2) {}
+
+ @Override
+ public void info(String format, Object... arguments) {}
+
+ @Override
+ public void info(String msg, Throwable t) {}
+
+ @Override
+ public boolean isInfoEnabled(Marker marker) {
+ return false;
+ }
+
+ @Override
+ public void info(Marker marker, String msg) {}
+
+ @Override
+ public void info(Marker marker, String format, Object arg) {}
+
+ @Override
+ public void info(Marker marker, String format, Object arg1, Object
arg2) {}
+
+ @Override
+ public void info(Marker marker, String format, Object... arguments)
{}
+
+ @Override
+ public void info(Marker marker, String msg, Throwable t) {}
+
+ // Warn
+ @Override
+ public boolean isWarnEnabled() {
+ return true;
+ }
+
+ @Override
+ public void warn(String msg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void warn(String format, Object arg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void warn(String format, Object... arguments) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void warn(String format, Object arg1, Object arg2) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void warn(String msg, Throwable t) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isWarnEnabled(Marker marker) {
+ return true;
+ }
+
+ @Override
+ public void warn(Marker marker, String msg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void warn(Marker marker, String format, Object arg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void warn(Marker marker, String format, Object arg1, Object
arg2) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void warn(Marker marker, String format, Object... arguments) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void warn(Marker marker, String msg, Throwable t) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isErrorEnabled() {
+ return false;
+ }
+
+ @Override
+ public void error(String msg) {}
+
+ @Override
+ public void error(String format, Object arg) {}
+
+ @Override
+ public void error(String format, Object arg1, Object arg2) {}
+
+ @Override
+ public void error(String format, Object... arguments) {}
+
+ @Override
+ public void error(String msg, Throwable t) {}
+
+ @Override
+ public boolean isErrorEnabled(Marker marker) {
+ return false;
+ }
+
+ @Override
+ public void error(Marker marker, String msg) {}
+
+ @Override
+ public void error(Marker marker, String format, Object arg) {}
+
+ @Override
+ public void error(Marker marker, String format, Object arg1, Object
arg2) {}
+
+ @Override
+ public void error(Marker marker, String format, Object... arguments)
{}
+
+ @Override
+ public void error(Marker marker, String msg, Throwable t) {}
+ });
+ handler.handle(
+ new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()),
+ "",
+ "");
+ handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()),
"", "");
+ try {
+ handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()),
"", "", true);
+ Assert.fail();
+ } catch (final UnsupportedOperationException e) {
+ // Expected
+ }
+ handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()),
"", "");
+ try {
+ handler.handle(
+ new TSStatus(TSStatusCode.METADATA_ERROR.getStatusCode())
+ .setMessage("No permissions for this operation, please add
privilege WRITE_DATA"),
+ "",
+ "",
+ true);
+ Assert.fail();
+ } catch (final UnsupportedOperationException e) {
+ // Expected
+ }
+
PipeReceiverStatusHandler.setLogger(LoggerFactory.getLogger(PipeReceiverStatusHandler.class));
+ }
+}