This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch norm
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b532062f186ae7b4dddcb86286fb50757bf2f650
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 22 10:02:05 2026 +0800

    Fix
---
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |  40 ++++++-
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  |  26 +++++
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  53 +++++++--
 .../legacy/IoTDBLegacyPipeReceiverAgentTest.java   | 122 +++++++++++++++++++++
 4 files changed, 227 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index e5769a5c6f6..c4c3986259f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -24,7 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.i18n.PipeMessages;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils;
 import org.apache.iotdb.commons.queryengine.common.SessionInfo;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -54,6 +56,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.file.Paths;
 import java.time.ZoneId;
 import java.util.Map;
 import java.util.Objects;
@@ -267,9 +270,11 @@ public class IoTDBLegacyPipeReceiverAgent {
    * @param tsFilePipeData pipeData
    * @param fileDir path of file data dir
    */
-  private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final 
String fileDir) {
+  private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final 
String fileDir)
+      throws IOException {
     final String tsFileName = tsFilePipeData.getTsFileName();
-    final File dir = new File(fileDir);
+    final File tsFile = resolveFileInFileDataDir(fileDir, tsFileName);
+    final File dir = tsFile.getParentFile();
     final File[] targetFiles =
         dir.listFiles((dir1, name) -> name.startsWith(tsFileName) && 
name.endsWith(PATCH_SUFFIX));
     if (targetFiles != null) {
@@ -311,10 +316,18 @@ public class IoTDBLegacyPipeReceiverAgent {
     final String fileDir = getFileDataDir(identityInfo);
     final String fileName = metaInfo.fileName;
     final long startIndex = metaInfo.startIndex;
-    final File file = new File(fileDir, fileName + PATCH_SUFFIX);
+    final File file;
+    final File fileWithoutPatch;
+    try {
+      fileWithoutPatch = resolveFileInFileDataDir(fileDir, fileName);
+      file = resolveFileInFileDataDir(fileDir, fileName + PATCH_SUFFIX);
+    } catch (final IOException e) {
+      LOGGER.warn(e.getMessage());
+      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+    }
 
     // step2. check startIndex
-    final IndexCheckResult result = checkStartIndexValid(new File(fileDir, 
fileName), startIndex);
+    final IndexCheckResult result = checkStartIndexValid(fileWithoutPatch, 
startIndex);
     if (!result.isResult()) {
       return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, 
result.getIndex());
     }
@@ -326,7 +339,7 @@ public class IoTDBLegacyPipeReceiverAgent {
       final byte[] byteArray = new byte[length];
       buff.get(byteArray);
       randomAccessFile.write(byteArray);
-      recordStartIndex(new File(fileDir, fileName), startIndex + length);
+      recordStartIndex(fileWithoutPatch, startIndex + length);
       LOGGER.debug(
           DataNodePipeMessages.SYNC_START_AT_TO_IS_DONE, fileName, startIndex, 
startIndex + length);
     } catch (final IOException e) {
@@ -337,6 +350,23 @@ public class IoTDBLegacyPipeReceiverAgent {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
+  private static File resolveFileInFileDataDir(final String fileDir, final 
String fileName)
+      throws IOException {
+    if (StringUtils.isEmpty(fileName)) {
+      throw new 
IOException(String.format(PipeMessages.ILLEGAL_FILENAME_PATH_TRAVERSAL, 
fileName));
+    }
+
+    final String illegalError = FileUtils.getIllegalError4Directory(fileName);
+    if (Objects.nonNull(illegalError)) {
+      throw new IOException(
+          String.format(PipeMessages.ILLEGAL_FILENAME_PATH_TRAVERSAL, fileName)
+              + ", "
+              + illegalError);
+    }
+
+    return PipeReceiverFilePathUtils.resolveFilePath(Paths.get(fileDir), 
fileName).toFile();
+  }
+
   private IndexCheckResult checkStartIndexValid(final File file, final long 
startIndex) {
     // get local index from memory map
     long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 829c9aed6b9..5ae7942d201 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -52,6 +53,9 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 import org.apache.iotdb.session.pool.SessionPool;
@@ -66,6 +70,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -229,6 +234,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
               useSSL,
               trustStore,
               trustStorePwd);
+      openClientSession();
       final TSyncIdentityInfo identityInfo =
           new TSyncIdentityInfo(
               pipeName, System.currentTimeMillis(), syncConnectorVersion, 
databaseName);
@@ -259,6 +265,26 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
             .build();
   }
 
+  private void openClientSession() throws TException {
+    final TSOpenSessionReq openSessionReq = new TSOpenSessionReq();
+    
openSessionReq.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+    openSessionReq.setUsername(user);
+    openSessionReq.setPassword(password);
+    openSessionReq.setZoneId(ZoneId.systemDefault().toString());
+    openSessionReq.putToConfiguration("version", 
IoTDBConstant.ClientVersion.V_1_0.toString());
+    openSessionReq.putToConfiguration("sql_dialect", "tree");
+
+    final TSOpenSessionResp openSessionResp = 
client.openSession(openSessionReq);
+    if (openSessionResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      final String errorMsg =
+          String.format(
+              "Failed to login to receiver %s:%s for legacy pipe transfer 
because %s",
+              ipAddress, port, openSessionResp.getStatus().getMessage());
+      LOGGER.warn(errorMsg);
+      throw new PipeRuntimeCriticalException(errorMsg);
+    }
+  }
+
   @Override
   public void heartbeat() throws Exception {
     // do nothing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 35ac06036b2..b9069d5910d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.common.rpc.thrift.TShowConfigurationTemplateResp;
 import org.apache.iotdb.commons.audit.AuditEventType;
 import org.apache.iotdb.commons.audit.AuditLogFields;
 import org.apache.iotdb.commons.audit.AuditLogOperation;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -3398,24 +3399,58 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   @Override
   public TSStatus handshake(final TSyncIdentityInfo info) throws TException {
-    return PipeDataNodeAgent.receiver()
-        .legacy()
-        .handshake(
-            info,
-            SESSION_MANAGER.getCurrSession().getClientAddress(),
-            partitionFetcher,
-            schemaFetcher);
+    try {
+      final TSStatus status = checkLegacyPipeReceiverPermission();
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+      return PipeDataNodeAgent.receiver()
+          .legacy()
+          .handshake(
+              info,
+              SESSION_MANAGER.getCurrSession().getClientAddress(),
+              partitionFetcher,
+              schemaFetcher);
+    } finally {
+      SESSION_MANAGER.updateIdleTime();
+    }
   }
 
   @Override
   public TSStatus sendPipeData(final ByteBuffer buff) throws TException {
-    return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff);
+    try {
+      final TSStatus status = checkLegacyPipeReceiverPermission();
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+      return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff);
+    } finally {
+      SESSION_MANAGER.updateIdleTime();
+    }
   }
 
   @Override
   public TSStatus sendFile(final TSyncTransportMetaInfo metaInfo, final 
ByteBuffer buff)
       throws TException {
-    return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff);
+    try {
+      final TSStatus status = checkLegacyPipeReceiverPermission();
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+      return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, 
buff);
+    } finally {
+      SESSION_MANAGER.updateIdleTime();
+    }
+  }
+
+  private TSStatus checkLegacyPipeReceiverPermission() {
+    final IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return getNotLoggedInStatus();
+    }
+    return AuthorityChecker.getTSStatus(
+        AuthorityChecker.checkSystemPermission(clientSession.getUsername(), 
PrivilegeType.USE_PIPE),
+        PrivilegeType.USE_PIPE);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
new file mode 100644
index 00000000000..5ce4df74f7f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.protocol.legacy;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class IoTDBLegacyPipeReceiverAgentTest {
+
+  private static final String PIPE_NAME = "poc";
+  private static final long CREATE_TIME = 1700000000000L;
+  private static final String REMOTE_ADDRESS = "127.0.0.1";
+
+  private String originalSyncDir;
+  private Path syncDir;
+  private IoTDBLegacyPipeReceiverAgent agent;
+
+  @Before
+  public void setUp() throws Exception {
+    originalSyncDir = CommonDescriptor.getInstance().getConfig().getSyncDir();
+    syncDir = Files.createTempDirectory("legacy-pipe-receiver");
+    CommonDescriptor.getInstance().getConfig().setSyncDir(syncDir.toString());
+
+    agent = new IoTDBLegacyPipeReceiverAgent();
+    final TSStatus status =
+        agent.handshake(
+            new TSyncIdentityInfo(PIPE_NAME, CREATE_TIME, "UNKNOWN", ""),
+            REMOTE_ADDRESS,
+            null,
+            null);
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (agent != null) {
+      agent.handleClientExit();
+    }
+    CommonDescriptor.getInstance().getConfig().setSyncDir(originalSyncDir);
+    if (syncDir != null) {
+      
org.apache.tsfile.external.commons.io.FileUtils.deleteDirectory(syncDir.toFile());
+    }
+  }
+
+  @Test
+  public void testTransportFileRejectsPathTraversal() throws Exception {
+    final String traversal =
+        ".." + File.separator + ".." + File.separator + ".." + File.separator 
+ "pwned";
+
+    final TSStatus status =
+        agent.transportFile(
+            new TSyncTransportMetaInfo(traversal, 0),
+            ByteBuffer.wrap("pwned".getBytes(StandardCharsets.UTF_8)));
+
+    Assert.assertEquals(TSStatusCode.SYNC_FILE_ERROR.getStatusCode(), 
status.getCode());
+    Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
+    Assert.assertFalse(Files.exists(syncDir.resolve("pwned.patch")));
+  }
+
+  @Test
+  public void testTransportFileWritesPlainFileUnderFileDataDir() throws 
Exception {
+    final String fileName = "1-2-3-4.tsfile";
+    final byte[] payload = "iotdb".getBytes(StandardCharsets.UTF_8);
+
+    final TSStatus status =
+        agent.transportFile(new TSyncTransportMetaInfo(fileName, 0), 
ByteBuffer.wrap(payload));
+
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+    final Path patchFile = getFileDataDir().resolve(fileName + ".patch");
+    Assert.assertArrayEquals(payload, Files.readAllBytes(patchFile));
+  }
+
+  @Test
+  public void testTransportPipeDataRejectsPathTraversalTsFileName() throws 
Exception {
+    final String traversal = ".." + File.separator + "evil.tsfile";
+
+    final TSStatus status =
+        agent.transportPipeData(ByteBuffer.wrap(new TsFilePipeData("", 
traversal, -1).serialize()));
+
+    Assert.assertEquals(TSStatusCode.PIPESERVER_ERROR.getStatusCode(), 
status.getCode());
+    Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
+  }
+
+  private Path getFileDataDir() {
+    return syncDir
+        .resolve("receiver")
+        .resolve(String.format("%s-%d-%s", PIPE_NAME, CREATE_TIME, 
REMOTE_ADDRESS))
+        .resolve("file-data");
+  }
+}

Reply via email to