This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ba29144921 Pipe: Mask sensitive attributes in sink subtask display 
strings (#17737)
2ba29144921 is described below

commit 2ba291449219c500ab05a2f61bf2a7022713c5e5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 28 18:39:31 2026 +0800

    Pipe: Mask sensitive attributes in sink subtask display strings (#17737)
    
    * Pipe: Mask sensitive attributes in sink subtask display strings
    
    Use masked PipeParameters display string for logs, metrics and subtask
    names while keeping unmasked sorted string for internal lifecycle map keys.
    Also treat scp.password as a sensitive parameter.
    
    Co-authored-by: Cursor <[email protected]>
    
    * Pipe: Fix sink compression timer keying and masked error paths
    
    Key compressionTimerMap by per-subtask taskID instead of masked attribute
    string to avoid collisions when only sensitive fields differ. Use masked
    display strings in subtask-not-found exceptions and pass sinkTaskId from
    runtime environment to IoTDB sinks for timer lookup.
    
    Co-authored-by: Cursor <[email protected]>
    
    ---------
    
    Co-authored-by: Cursor <[email protected]>
---
 .../api/customizer/parameter/PipeParameters.java   |  1 +
 .../task/subtask/sink/PipeSinkSubtaskManager.java  | 54 ++++++++++++++++++----
 .../metric/sink/PipeDataRegionSinkMetrics.java     | 10 ++--
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  5 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  5 +-
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  5 +-
 .../plugin/env/PipeTaskSinkRuntimeEnvironment.java |  9 ++++
 .../commons/pipe/sink/protocol/IoTDBSink.java      |  7 ++-
 8 files changed, 70 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index c13f87ae579..f9ef2a64a83 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -429,6 +429,7 @@ public class PipeParameters {
 
     static {
       KEYS.add("ssl.trust-store-pwd");
+      KEYS.add("scp.password");
       KEYS.add("password");
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 3ad99ca5c06..01552eec5ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -64,6 +64,8 @@ public class PipeSinkSubtaskManager {
   private final Map<String, List<PipeSinkSubtaskLifeCycle>>
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
+  private final Map<String, String> attributeSortedString2DisplayString = new 
HashMap<>();
+
   public synchronized String register(
       final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
       final PipeParameters pipeSinkParameters,
@@ -92,6 +94,7 @@ public class PipeSinkSubtaskManager {
     final int sinkNum;
     boolean realTimeFirst = false;
     String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
+    final String attributeDisplayString = 
generateAttributeDisplayString(pipeSinkParameters);
     if (isDataRegionSink) {
       sinkNum =
           pipeSinkParameters.getIntOrDefault(
@@ -120,7 +123,9 @@ public class PipeSinkSubtaskManager {
       sinkNum = 1;
       attributeSortedString = "schema_" + attributeSortedString;
     }
-    environment.setAttributeSortedString(attributeSortedString);
+    final String attributeDisplayStringWithPrefix =
+        isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + 
attributeDisplayString;
+    environment.setAttributeSortedString(attributeDisplayStringWithPrefix);
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final PipeSinkSubtaskExecutor executor = executorSupplier.get();
@@ -138,6 +143,12 @@ public class PipeSinkSubtaskManager {
       }
 
       for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
+        final String taskID =
+            String.format(
+                "%s_%s_%s",
+                attributeDisplayStringWithPrefix, 
environment.getCreationTime(), sinkIndex);
+        environment.setSinkTaskId(taskID);
+
         final PipeConnector pipeSink =
             isDataRegionSink
                 ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
@@ -168,10 +179,9 @@ public class PipeSinkSubtaskManager {
         // 2. Construct PipeConnectorSubtaskLifeCycle to manage 
PipeConnectorSubtask's life cycle
         final PipeSinkSubtask pipeSinkSubtask =
             new PipeSinkSubtask(
-                String.format(
-                    "%s_%s_%s", attributeSortedString, 
environment.getCreationTime(), sinkIndex),
+                taskID,
                 environment.getCreationTime(),
-                attributeSortedString,
+                attributeDisplayStringWithPrefix,
                 sinkIndex,
                 pendingQueue,
                 pipeSink);
@@ -182,11 +192,13 @@ public class PipeSinkSubtaskManager {
 
       LOGGER.info(
           DataNodePipeMessages.PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED,
-          attributeSortedString,
+          attributeDisplayStringWithPrefix,
           executor.getWorkingThreadName(),
           executor.getCallbackThreadName());
       attributeSortedString2SubtaskLifeCycleMap.put(
           attributeSortedString, pipeSinkSubtaskLifeCycleList);
+      attributeSortedString2DisplayString.put(
+          attributeSortedString, attributeDisplayStringWithPrefix);
     }
 
     for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -203,7 +215,7 @@ public class PipeSinkSubtaskManager {
       final int regionId,
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
+      throwNoSuchSubtaskException(attributeSortedString);
     }
 
     final List<PipeSinkSubtaskLifeCycle> lifeCycles =
@@ -219,6 +231,7 @@ public class PipeSinkSubtaskManager {
 
     if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
+      attributeSortedString2DisplayString.remove(attributeSortedString);
       executor.shutdown();
       LOGGER.info(
           DataNodePipeMessages.THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN,
@@ -234,7 +247,7 @@ public class PipeSinkSubtaskManager {
 
   public synchronized void start(final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
+      throwNoSuchSubtaskException(attributeSortedString);
     }
 
     for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -245,7 +258,7 @@ public class PipeSinkSubtaskManager {
 
   public synchronized void stop(final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
+      throwNoSuchSubtaskException(attributeSortedString);
     }
 
     for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -258,7 +271,8 @@ public class PipeSinkSubtaskManager {
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(
-          DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK + 
attributeSortedString);
+          DataNodePipeMessages.FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK
+              + getDisplayStringForException(attributeSortedString));
     }
 
     // All subtasks share the same pending queue
@@ -268,13 +282,33 @@ public class PipeSinkSubtaskManager {
         .getPendingQueue();
   }
 
-  private String generateAttributeSortedString(final PipeParameters 
pipeConnectorParameters) {
+  private static String generateAttributeSortedString(
+      final PipeParameters pipeConnectorParameters) {
     final TreeMap<String, String> sortedStringSourceMap =
         new TreeMap<>(pipeConnectorParameters.getAttribute());
     sortedStringSourceMap.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
     return sortedStringSourceMap.toString();
   }
 
+  /** Masked attribute string for logs, metrics and exception messages. */
+  private static String generateAttributeDisplayString(
+      final PipeParameters pipeConnectorParameters) {
+    final TreeMap<String, String> filteredAttributes =
+        new TreeMap<>(pipeConnectorParameters.getAttribute());
+    filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY);
+    return new PipeParameters(filteredAttributes).toString();
+  }
+
+  private void throwNoSuchSubtaskException(final String attributeSortedString) 
{
+    throw new PipeException(
+        FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE
+            + getDisplayStringForException(attributeSortedString));
+  }
+
+  private String getDisplayStringForException(final String 
attributeSortedString) {
+    return 
attributeSortedString2DisplayString.getOrDefault(attributeSortedString, 
"unknown");
+  }
+
   /////////////////////////  Singleton Instance Holder  
/////////////////////////
 
   private PipeSinkSubtaskManager() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index dd7707d1b96..9b2f876ff2d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -199,8 +199,8 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
 
   private void createTimer(final String taskID) {
     final PipeSinkSubtask sink = sinkMap.get(taskID);
-    compressionTimerMap.putIfAbsent(
-        sink.getAttributeSortedString(),
+    compressionTimerMap.put(
+        taskID,
         metricService.getOrCreateTimer(
             Metric.PIPE_COMPRESSION_TIME.toString(),
             MetricLevel.IMPORTANT,
@@ -394,7 +394,7 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
         sink.getAttributeSortedString(),
         Tag.CREATION_TIME.toString(),
         String.valueOf(sink.getCreationTime()));
-    compressionTimerMap.remove(sink.getAttributeSortedString());
+    compressionTimerMap.remove(taskID);
   }
 
   private void removeHistogram(final String taskID) {
@@ -492,8 +492,8 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     rate.mark();
   }
 
-  public Timer getCompressionTimer(final String attributeSortedString) {
-    return Objects.isNull(metricService) ? null : 
compressionTimerMap.get(attributeSortedString);
+  public Timer getCompressionTimer(final String taskID) {
+    return Objects.isNull(metricService) ? null : 
compressionTimerMap.get(taskID);
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index ea83524988b..4f2dab1bfa8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -604,9 +604,8 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
 
   @Override
   protected byte[] compressIfNeeded(final byte[] reqInBytes) throws 
IOException {
-    if (Objects.isNull(compressionTimer) && 
Objects.nonNull(attributeSortedString)) {
-      compressionTimer =
-          
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
+      compressionTimer = 
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
     }
     return super.compressIfNeeded(reqInBytes);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index c607490b362..bd3f06ba778 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -508,9 +508,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
   @Override
   public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
-    if (Objects.isNull(compressionTimer) && 
Objects.nonNull(attributeSortedString)) {
-      compressionTimer =
-          
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
+      compressionTimer = 
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
     }
     return super.compressIfNeeded(req);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index d9e25f5e09f..eb7d39864c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -595,9 +595,8 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
 
   @Override
   public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
-    if (Objects.isNull(compressionTimer) && 
Objects.nonNull(attributeSortedString)) {
-      compressionTimer =
-          
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(attributeSortedString);
+    if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) {
+      compressionTimer = 
PipeDataRegionSinkMetrics.getInstance().getCompressionTimer(sinkTaskId);
     }
     return super.compressIfNeeded(req);
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
index b8382891348..26081d9c78a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.config.plugin.env;
 
 public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment 
{
   private String attributeSortedString;
+  private String sinkTaskId;
 
   public PipeTaskSinkRuntimeEnvironment(
       final String pipeName, final long creationTime, final int regionId) {
@@ -34,4 +35,12 @@ public class PipeTaskSinkRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
   public void setAttributeSortedString(String attributeSortedString) {
     this.attributeSortedString = attributeSortedString;
   }
+
+  public String getSinkTaskId() {
+    return sinkTaskId;
+  }
+
+  public void setSinkTaskId(final String sinkTaskId) {
+    this.sinkTaskId = sinkTaskId;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index a52779650f8..b5662aeec2c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -189,6 +189,7 @@ public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEvent
   private final AtomicLong totalUncompressedSize = new AtomicLong(0);
   private final AtomicLong totalCompressedSize = new AtomicLong(0);
   protected String attributeSortedString;
+  protected String sinkTaskId;
   protected Timer compressionTimer;
   protected boolean isRealtimeFirst;
 
@@ -391,8 +392,10 @@ public abstract class IoTDBSink implements PipeConnector, 
PipeConnectorWithEvent
       throws Exception {
     final PipeRuntimeEnvironment environment = 
configuration.getRuntimeEnvironment();
     if (environment instanceof PipeTaskSinkRuntimeEnvironment) {
-      attributeSortedString =
-          ((PipeTaskSinkRuntimeEnvironment) 
environment).getAttributeSortedString();
+      final PipeTaskSinkRuntimeEnvironment sinkEnvironment =
+          (PipeTaskSinkRuntimeEnvironment) environment;
+      attributeSortedString = sinkEnvironment.getAttributeSortedString();
+      sinkTaskId = sinkEnvironment.getSinkTaskId();
     }
 
     nodeUrls.clear();

Reply via email to