This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0a79b6d2a8d768e0fde59b924a794e8c84c8d364 Author: Caideyipi <[email protected]> AuthorDate: Wed Nov 26 15:01:16 2025 +0800 Pipe: Fixed the check for no permission 2 (#16804) * edge * sonar * emperor-cloth * emperor-patch (cherry picked from commit 448592e3c8db1afd67bf4b2e6a60c4f846781ed1) --- .../sink/protocol/IoTDBConfigRegionAirGapSink.java | 6 +- .../pipe/sink/protocol/IoTDBConfigRegionSink.java | 6 +- .../airgap/IoTDBSchemaRegionAirGapSink.java | 6 +- .../handler/PipeConsensusDeleteEventHandler.java | 2 +- .../thrift/sync/IoTDBDataRegionSyncSink.java | 3 +- .../thrift/sync/IoTDBSchemaRegionSink.java | 6 +- .../pipe/receiver/PipeReceiverStatusHandler.java | 35 ++- .../PipeReceiverStatusHandlerTest.java | 302 +++++++++++++++++++++ 8 files changed, 351 insertions(+), 15 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java index 83c03ca9cd0..882c33cb657 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java @@ -191,7 +191,8 @@ public class IoTDBConfigRegionAirGapSink extends IoTDBAirGapSink { new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeConfigRegionWritePlanEvent.toString()); + pipeConfigRegionWritePlanEvent.toString(), + true); } } @@ -252,7 +253,8 @@ public class IoTDBConfigRegionAirGapSink extends IoTDBAirGapSink { new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeConfigRegionSnapshotEvent.toString()); + pipeConfigRegionSnapshotEvent.toString(), + true); } else { LOGGER.info("Successfully transferred config region snapshot {}.", snapshot); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java index 5866839af17..5ce03f02598 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java @@ -188,7 +188,8 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink { String.format( "Transfer config region write plan %s error, result status %s.", pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), status), - pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString()); + pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString(), + true); } if (LOGGER.isDebugEnabled()) { @@ -279,7 +280,8 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink { String.format( "Seal config region snapshot file %s error, result status %s.", snapshotFile, resp.getStatus()), - snapshotFile.toString()); + snapshotFile.toString(), + true); } LOGGER.info("Successfully transferred config region snapshot {}.", snapshotFile); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java index 3634b2396a4..e4d39b49523 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java @@ -122,7 +122,8 @@ public class IoTDBSchemaRegionAirGapSink extends IoTDBDataNodeAirGapSink { new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeSchemaRegionWritePlanEvent.toString()); + pipeSchemaRegionWritePlanEvent.toString(), + true); } } @@ -187,7 +188,8 @@ public class IoTDBSchemaRegionAirGapSink extends IoTDBDataNodeAirGapSink { new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), errorMessage, - pipeSchemaRegionSnapshotEvent.toString()); + pipeSchemaRegionSnapshotEvent.toString(), + true); } else { LOGGER.info( "Successfully transferred schema region snapshot {}, {} and {}.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java index 6f4e93ba445..2f39ffab39d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java @@ -79,7 +79,7 @@ public class PipeConsensusDeleteEventHandler // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - connector.statusHandler().handle(status, status.getMessage(), event.toString()); + connector.statusHandler().handle(status, status.getMessage(), event.toString(), true); } event.decreaseReferenceCount(PipeConsensusDeleteEventHandler.class.getName(), true); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 27ad636db38..016f787afaa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -253,7 +253,8 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { String.format( "Transfer deletion %s error, result status %s.", pipeDeleteDataNodeEvent.getDeleteDataNode().getType(), status), - pipeDeleteDataNodeEvent.getDeletionResource().toString()); + pipeDeleteDataNodeEvent.getDeletionResource().toString(), + true); } if (LOGGER.isDebugEnabled()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java index e1e1e7868d7..e001367a4c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java @@ -127,7 +127,8 @@ public class IoTDBSchemaRegionSink extends IoTDBDataNodeSyncSink { String.format( "Transfer data node write plan %s error, result status %s.", pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), status), - pipeSchemaRegionWritePlanEvent.getPlanNode().toString()); + pipeSchemaRegionWritePlanEvent.getPlanNode().toString(), + true); } if (LOGGER.isDebugEnabled()) { @@ -222,7 +223,8 @@ public class IoTDBSchemaRegionSink extends IoTDBDataNodeSyncSink { String.format( "Seal file %s, %s and %s error, result status %s.", mTreeSnapshotFile, tagLogSnapshotFile, attributeSnapshotFile, resp.getStatus()), - snapshotEvent.toString()); + snapshotEvent.toString(), + true); } LOGGER.info( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index 9fe87b22fd7..886d387f60c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.utils.RetryUtils; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -44,7 +45,7 @@ import java.util.concurrent.atomic.AtomicReference; public class PipeReceiverStatusHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class); + private static Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class); private static final String NO_PERMISSION = "No permission"; private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception"; private static final String NO_PERMISSION_STR = "No permissions for this operation"; @@ -86,6 +87,11 @@ public class PipeReceiverStatusHandler { this.skipIfNoPrivileges = skipIfNoPrivileges; } + public void handle( + final TSStatus status, final String exceptionMessage, final String recordMessage) { + handle(status, exceptionMessage, recordMessage, false); + } + /** * Handle {@link TSStatus} returned by receiver. Do nothing if ignore the {@link Event}, and throw * exception if retry the {@link Event}. Upper class must ensure that the method is invoked only @@ -99,7 +105,10 @@ public class PipeReceiverStatusHandler { * put any time-related info here */ public void handle( - final TSStatus status, final String exceptionMessage, final String recordMessage) { + final TSStatus status, + final String exceptionMessage, + final String recordMessage, + final boolean log4NoPrivileges) { if (RetryUtils.needRetryForWrite(status.getCode())) { LOGGER.info("IoTConsensusV2: will retry with increasing interval. status: {}", status); @@ -184,17 +193,28 @@ public class PipeReceiverStatusHandler { case 803: // NO_PERMISSION if (skipIfNoPrivileges) { + if (log4NoPrivileges && LOGGER.isWarnEnabled()) { + LOGGER.warn( + "{}: Skip if no privileges. will be ignored. event: {}. status: {}", + getNoPermission(true), + shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded", + status); + } return; } handleOtherExceptions(status, exceptionMessage, recordMessage, true); break; - case 305: - handleOtherExceptions(status, exceptionMessage, recordMessage, false); - break; default: // Some auth error may be wrapped in other codes if (exceptionMessage.contains(NO_PERMISSION_STR)) { if (skipIfNoPrivileges) { + if (log4NoPrivileges && LOGGER.isWarnEnabled()) { + LOGGER.warn( + "{}: Skip if no privileges. will be ignored. event: {}. status: {}", + getNoPermission(true), + shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded", + status); + } return; } handleOtherExceptions(status, exceptionMessage, recordMessage, true); @@ -314,4 +334,9 @@ public class PipeReceiverStatusHandler { resultStatus.setSubStatus(givenStatusList); return resultStatus; } + + @TestOnly + public static void setLogger(final Logger logger) { + LOGGER = logger; + } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java new file mode 100644 index 00000000000..38d2e015165 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.datastructure; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE; + +public class PipeReceiverStatusHandlerTest { + @Test + public void testAuthLogger() { + final PipeReceiverStatusHandler handler = + new PipeReceiverStatusHandler( + CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE.equals("retry"), + CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE, + CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE, + CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE, + CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE, + true); + PipeReceiverStatusHandler.setLogger( + new Logger() { + @Override + public String getName() { + return null; + } + + @Override + public boolean isTraceEnabled() { + return false; + } + + @Override + public void trace(String msg) {} + + @Override + public void trace(String format, Object arg) {} + + @Override + public void trace(String format, Object arg1, Object arg2) {} + + @Override + public void trace(String format, Object... arguments) {} + + @Override + public void trace(String msg, Throwable t) {} + + @Override + public boolean isTraceEnabled(Marker marker) { + return false; + } + + @Override + public void trace(Marker marker, String msg) {} + + @Override + public void trace(Marker marker, String format, Object arg) {} + + @Override + public void trace(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void trace(Marker marker, String format, Object... argArray) {} + + @Override + public void trace(Marker marker, String msg, Throwable t) {} + + @Override + public boolean isDebugEnabled() { + return false; + } + + @Override + public void debug(String msg) {} + + @Override + public void debug(String format, Object arg) {} + + @Override + public void debug(String format, Object arg1, Object arg2) {} + + @Override + public void debug(String format, Object... arguments) {} + + @Override + public void debug(String msg, Throwable t) {} + + @Override + public boolean isDebugEnabled(Marker marker) { + return false; + } + + @Override + public void debug(Marker marker, String msg) {} + + @Override + public void debug(Marker marker, String format, Object arg) {} + + @Override + public void debug(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void debug(Marker marker, String format, Object... arguments) {} + + @Override + public void debug(Marker marker, String msg, Throwable t) {} + + @Override + public boolean isInfoEnabled() { + return false; + } + + @Override + public void info(String msg) {} + + @Override + public void info(String format, Object arg) {} + + @Override + public void info(String format, Object arg1, Object arg2) {} + + @Override + public void info(String format, Object... arguments) {} + + @Override + public void info(String msg, Throwable t) {} + + @Override + public boolean isInfoEnabled(Marker marker) { + return false; + } + + @Override + public void info(Marker marker, String msg) {} + + @Override + public void info(Marker marker, String format, Object arg) {} + + @Override + public void info(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void info(Marker marker, String format, Object... arguments) {} + + @Override + public void info(Marker marker, String msg, Throwable t) {} + + // Warn + @Override + public boolean isWarnEnabled() { + return true; + } + + @Override + public void warn(String msg) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(String format, Object arg) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(String format, Object... arguments) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(String format, Object arg1, Object arg2) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(String msg, Throwable t) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWarnEnabled(Marker marker) { + return true; + } + + @Override + public void warn(Marker marker, String msg) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(Marker marker, String format, Object arg) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(Marker marker, String format, Object arg1, Object arg2) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(Marker marker, String format, Object... arguments) { + throw new UnsupportedOperationException(); + } + + @Override + public void warn(Marker marker, String msg, Throwable t) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isErrorEnabled() { + return false; + } + + @Override + public void error(String msg) {} + + @Override + public void error(String format, Object arg) {} + + @Override + public void error(String format, Object arg1, Object arg2) {} + + @Override + public void error(String format, Object... arguments) {} + + @Override + public void error(String msg, Throwable t) {} + + @Override + public boolean isErrorEnabled(Marker marker) { + return false; + } + + @Override + public void error(Marker marker, String msg) {} + + @Override + public void error(Marker marker, String format, Object arg) {} + + @Override + public void error(Marker marker, String format, Object arg1, Object arg2) {} + + @Override + public void error(Marker marker, String format, Object... arguments) {} + + @Override + public void error(Marker marker, String msg, Throwable t) {} + }); + handler.handle( + new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()), + "", + ""); + handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), "", ""); + try { + handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), "", "", true); + Assert.fail(); + } catch (final UnsupportedOperationException e) { + // Expected + } + handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), "", ""); + try { + handler.handle( + new TSStatus(TSStatusCode.METADATA_ERROR.getStatusCode()) + .setMessage("No permissions for this operation, please add privilege WRITE_DATA"), + "", + "", + true); + Assert.fail(); + } catch (final UnsupportedOperationException e) { + // Expected + } + PipeReceiverStatusHandler.setLogger(LoggerFactory.getLogger(PipeReceiverStatusHandler.class)); + } +}
