This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch auth-fix-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/auth-fix-2 by this push:
     new fead66e2cc7 Pipe: Fixed the check for no permission 2 (#16804)
fead66e2cc7 is described below

commit fead66e2cc7caad2d51bc7fdf0524d7c635d04eb
Author: Caideyipi <[email protected]>
AuthorDate: Wed Nov 26 15:01:16 2025 +0800

    Pipe: Fixed the check for no permission 2 (#16804)
    
    * edge
    
    * sonar
    
    * emperor-cloth
    
    * emperor-patch
---
 .../sink/protocol/IoTDBConfigRegionAirGapSink.java |   6 +-
 .../pipe/sink/protocol/IoTDBConfigRegionSink.java  |   6 +-
 .../airgap/IoTDBSchemaRegionAirGapSink.java        |   3 +-
 .../thrift/sync/IoTDBSchemaRegionSink.java         |   2 +-
 .../pipe/receiver/PipeReceiverStatusHandler.java   |  33 ++-
 .../PipeReceiverStatusHandlerTest.java             | 302 +++++++++++++++++++++
 6 files changed, 341 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
index 97413d68beb..e1f66733518 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java
@@ -183,7 +183,8 @@ public class IoTDBConfigRegionAirGapSink extends 
IoTDBAirGapSink {
           new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
               .setMessage(errorMessage),
           errorMessage,
-          pipeConfigRegionWritePlanEvent.toString());
+          pipeConfigRegionWritePlanEvent.toString(),
+          true);
     }
   }
 
@@ -239,7 +240,8 @@ public class IoTDBConfigRegionAirGapSink extends 
IoTDBAirGapSink {
           new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
               .setMessage(errorMessage),
           errorMessage,
-          pipeConfigRegionSnapshotEvent.toString());
+          pipeConfigRegionSnapshotEvent.toString(),
+          true);
     } else {
       LOGGER.info("Successfully transferred config region snapshot {}.", 
snapshot);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
index d3555e62dfa..1b508eb2f5c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java
@@ -179,7 +179,8 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink 
{
           String.format(
               "Transfer config region write plan %s error, result status %s.",
               
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), status),
-          pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString());
+          pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString(),
+          true);
     }
 
     if (LOGGER.isDebugEnabled()) {
@@ -265,7 +266,8 @@ public class IoTDBConfigRegionSink extends IoTDBSslSyncSink 
{
           String.format(
               "Seal config region snapshot file %s error, result status %s.",
               snapshotFile, resp.getStatus()),
-          snapshotFile.toString());
+          snapshotFile.toString(),
+          true);
     }
 
     LOGGER.info("Successfully transferred config region snapshot {}.", 
snapshotFile);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index bcbf88d2385..8280709446a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -133,7 +133,8 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
           new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
               .setMessage(errorMessage),
           errorMessage,
-          pipeSchemaRegionSnapshotEvent.toString());
+          pipeSchemaRegionSnapshotEvent.toString(),
+          true);
     } else {
       LOGGER.info(
           "Successfully transferred schema region snapshot {} and {}.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
index 94333e824d6..6abbb70dc51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
@@ -145,7 +145,7 @@ public class IoTDBSchemaRegionSink extends 
IoTDBDataNodeSyncSink {
           String.format(
               "Seal file %s and %s error, result status %s.",
               mTreeSnapshotFile, tagLogSnapshotFile, resp.getStatus()),
-          snapshotEvent.toString());
+          snapshotEvent.toString(), true);
     }
 
     LOGGER.info("Successfully transferred file {} and {}.", mTreeSnapshotFile, 
tagLogSnapshotFile);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 71be50163fe..3cf9a4e7f03 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -25,6 +25,8 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigur
 import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.RetryUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -42,7 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeReceiverStatusHandler {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
+  private static Logger LOGGER = 
LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
   private static final String NO_PERMISSION = "No permission";
   private static final String UNCLASSIFIED_EXCEPTION = "Unclassified 
exception";
   private static final String NO_PERMISSION_STR = "No permissions for this 
operation";
@@ -84,6 +86,11 @@ public class PipeReceiverStatusHandler {
     this.skipIfNoPrivileges = skipIfNoPrivileges;
   }
 
+  public void handle(
+      final TSStatus status, final String exceptionMessage, final String 
recordMessage) {
+    handle(status, exceptionMessage, recordMessage, false);
+  }
+
   /**
    * Handle {@link TSStatus} returned by receiver. Do nothing if ignore the 
{@link Event}, and throw
    * exception if retry the {@link Event}. Upper class must ensure that the 
method is invoked only
@@ -97,7 +104,7 @@ public class PipeReceiverStatusHandler {
    *     put any time-related info here
    */
   public void handle(
-      final TSStatus status, final String exceptionMessage, final String 
recordMessage) {
+      final TSStatus status, final String exceptionMessage, final String 
recordMessage, final boolean log4NoPrivileges) {
     switch (status.getCode()) {
       case 200: // SUCCESS_STATUS
       case 400: // REDIRECTION_RECOMMEND
@@ -170,17 +177,28 @@ public class PipeReceiverStatusHandler {
 
       case 803: // NO_PERMISSION
         if (skipIfNoPrivileges) {
+          if (log4NoPrivileges && LOGGER.isWarnEnabled()) {
+            LOGGER.warn(
+                "{}: Skip if no privileges. will be ignored. event: {}. 
status: {}",
+                getNoPermission(true),
+                shouldRecordIgnoredDataWhenOtherExceptionsOccur ? 
recordMessage : "not recorded",
+                status);
+          }
           return;
         }
         handleOtherExceptions(status, exceptionMessage, recordMessage, true);
         break;
-      case 305:
-        handleOtherExceptions(status, exceptionMessage, recordMessage, false);
-        break;
       default:
         // Some auth error may be wrapped in other codes
         if (exceptionMessage.contains(NO_PERMISSION_STR)) {
           if (skipIfNoPrivileges) {
+            if (log4NoPrivileges && LOGGER.isWarnEnabled()) {
+              LOGGER.warn(
+                  "{}: Skip if no privileges. will be ignored. event: {}. 
status: {}",
+                  getNoPermission(true),
+                  shouldRecordIgnoredDataWhenOtherExceptionsOccur ? 
recordMessage : "not recorded",
+                  status);
+            }
             return;
           }
           handleOtherExceptions(status, exceptionMessage, recordMessage, true);
@@ -300,4 +318,9 @@ public class PipeReceiverStatusHandler {
     resultStatus.setSubStatus(givenStatusList);
     return resultStatus;
   }
+
+  @TestOnly
+  public static void setLogger(final Logger logger) {
+    LOGGER = logger;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java
new file mode 100644
index 00000000000..38d2e015165
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/PipeReceiverStatusHandlerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
+
+public class PipeReceiverStatusHandlerTest {
+  @Test
+  public void testAuthLogger() {
+    final PipeReceiverStatusHandler handler =
+        new PipeReceiverStatusHandler(
+            
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE.equals("retry"),
+            CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE,
+            CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE,
+            CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE,
+            CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE,
+            true);
+    PipeReceiverStatusHandler.setLogger(
+        new Logger() {
+          @Override
+          public String getName() {
+            return null;
+          }
+
+          @Override
+          public boolean isTraceEnabled() {
+            return false;
+          }
+
+          @Override
+          public void trace(String msg) {}
+
+          @Override
+          public void trace(String format, Object arg) {}
+
+          @Override
+          public void trace(String format, Object arg1, Object arg2) {}
+
+          @Override
+          public void trace(String format, Object... arguments) {}
+
+          @Override
+          public void trace(String msg, Throwable t) {}
+
+          @Override
+          public boolean isTraceEnabled(Marker marker) {
+            return false;
+          }
+
+          @Override
+          public void trace(Marker marker, String msg) {}
+
+          @Override
+          public void trace(Marker marker, String format, Object arg) {}
+
+          @Override
+          public void trace(Marker marker, String format, Object arg1, Object 
arg2) {}
+
+          @Override
+          public void trace(Marker marker, String format, Object... argArray) 
{}
+
+          @Override
+          public void trace(Marker marker, String msg, Throwable t) {}
+
+          @Override
+          public boolean isDebugEnabled() {
+            return false;
+          }
+
+          @Override
+          public void debug(String msg) {}
+
+          @Override
+          public void debug(String format, Object arg) {}
+
+          @Override
+          public void debug(String format, Object arg1, Object arg2) {}
+
+          @Override
+          public void debug(String format, Object... arguments) {}
+
+          @Override
+          public void debug(String msg, Throwable t) {}
+
+          @Override
+          public boolean isDebugEnabled(Marker marker) {
+            return false;
+          }
+
+          @Override
+          public void debug(Marker marker, String msg) {}
+
+          @Override
+          public void debug(Marker marker, String format, Object arg) {}
+
+          @Override
+          public void debug(Marker marker, String format, Object arg1, Object 
arg2) {}
+
+          @Override
+          public void debug(Marker marker, String format, Object... arguments) 
{}
+
+          @Override
+          public void debug(Marker marker, String msg, Throwable t) {}
+
+          @Override
+          public boolean isInfoEnabled() {
+            return false;
+          }
+
+          @Override
+          public void info(String msg) {}
+
+          @Override
+          public void info(String format, Object arg) {}
+
+          @Override
+          public void info(String format, Object arg1, Object arg2) {}
+
+          @Override
+          public void info(String format, Object... arguments) {}
+
+          @Override
+          public void info(String msg, Throwable t) {}
+
+          @Override
+          public boolean isInfoEnabled(Marker marker) {
+            return false;
+          }
+
+          @Override
+          public void info(Marker marker, String msg) {}
+
+          @Override
+          public void info(Marker marker, String format, Object arg) {}
+
+          @Override
+          public void info(Marker marker, String format, Object arg1, Object 
arg2) {}
+
+          @Override
+          public void info(Marker marker, String format, Object... arguments) 
{}
+
+          @Override
+          public void info(Marker marker, String msg, Throwable t) {}
+
+          // Warn
+          @Override
+          public boolean isWarnEnabled() {
+            return true;
+          }
+
+          @Override
+          public void warn(String msg) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void warn(String format, Object arg) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void warn(String format, Object... arguments) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void warn(String format, Object arg1, Object arg2) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void warn(String msg, Throwable t) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isWarnEnabled(Marker marker) {
+            return true;
+          }
+
+          @Override
+          public void warn(Marker marker, String msg) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void warn(Marker marker, String format, Object arg) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void warn(Marker marker, String format, Object arg1, Object 
arg2) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void warn(Marker marker, String format, Object... arguments) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void warn(Marker marker, String msg, Throwable t) {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isErrorEnabled() {
+            return false;
+          }
+
+          @Override
+          public void error(String msg) {}
+
+          @Override
+          public void error(String format, Object arg) {}
+
+          @Override
+          public void error(String format, Object arg1, Object arg2) {}
+
+          @Override
+          public void error(String format, Object... arguments) {}
+
+          @Override
+          public void error(String msg, Throwable t) {}
+
+          @Override
+          public boolean isErrorEnabled(Marker marker) {
+            return false;
+          }
+
+          @Override
+          public void error(Marker marker, String msg) {}
+
+          @Override
+          public void error(Marker marker, String format, Object arg) {}
+
+          @Override
+          public void error(Marker marker, String format, Object arg1, Object 
arg2) {}
+
+          @Override
+          public void error(Marker marker, String format, Object... arguments) 
{}
+
+          @Override
+          public void error(Marker marker, String msg, Throwable t) {}
+        });
+    handler.handle(
+        new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()),
+        "",
+        "");
+    handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), 
"", "");
+    try {
+      handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), 
"", "", true);
+      Assert.fail();
+    } catch (final UnsupportedOperationException e) {
+      // Expected
+    }
+    handler.handle(new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()), 
"", "");
+    try {
+      handler.handle(
+          new TSStatus(TSStatusCode.METADATA_ERROR.getStatusCode())
+              .setMessage("No permissions for this operation, please add 
privilege WRITE_DATA"),
+          "",
+          "",
+          true);
+      Assert.fail();
+    } catch (final UnsupportedOperationException e) {
+      // Expected
+    }
+    
PipeReceiverStatusHandler.setLogger(LoggerFactory.getLogger(PipeReceiverStatusHandler.class));
+  }
+}

Reply via email to