This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e94f0e8ed11 [dev/1.3] Pipe: Harden legacy pipe file transfer
validation and access checks (#17791)
e94f0e8ed11 is described below
commit e94f0e8ed11b7a5c95f888f76a26e703aa3c03b1
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 2 15:08:54 2026 +0800
[dev/1.3] Pipe: Harden legacy pipe file transfer validation and access
checks (#17791)
* Pipe: Harden legacy pipe file transfer validation and access checks
(#17741)
* Fix
* fix
* Fix legacy pipe receiver test FileUtils import
---
.../single/IoTDBLegacyPipeReceiverSecurityIT.java | 112 +++++++++++++++++++
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 50 +++++++--
.../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 26 +++++
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 54 +++++++--
.../legacy/IoTDBLegacyPipeReceiverAgentTest.java | 123 +++++++++++++++++++++
5 files changed, 344 insertions(+), 21 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
new file mode 100644
index 00000000000..51e1e211535
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.single;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBLegacyPipeReceiverSecurityIT {
+
+ @BeforeClass
+ public static void setUp() {
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testRejectPathTraversalFileNameInLegacyTransportFile() throws
Exception {
+ final DataNodeWrapper dataNode = EnvFactory.getEnv().getDataNodeWrapper(0);
+
+ try (final IoTDBSyncClient client =
+ new IoTDBSyncClient(
+ new ThriftClientProperty.Builder().build(),
+ dataNode.getIp(),
+ dataNode.getPort(),
+ false,
+ null,
+ null)) {
+ final TSOpenSessionResp openSessionResp =
client.openSession(createOpenSessionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
openSessionResp.getStatus().getCode());
+
+ try {
+ final TSStatus handshakeStatus =
+ client.handshake(
+ new TSyncIdentityInfo(
+ "pathTraversalPipe", System.currentTimeMillis(),
"UNKNOWN", ""));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
handshakeStatus.getCode());
+
+ final String maliciousFileName =
+ ".." + File.separator + ".." + File.separator + "pwned.tsfile";
+ final TSStatus status =
+ client.sendFile(
+ new TSyncTransportMetaInfo(maliciousFileName, 0),
+ ByteBuffer.wrap("pwned".getBytes(StandardCharsets.UTF_8)));
+
+ Assert.assertEquals(TSStatusCode.SYNC_FILE_ERROR.getStatusCode(),
status.getCode());
+ Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
+ } finally {
+ client.closeSession(new
TSCloseSessionReq(openSessionResp.getSessionId()));
+ }
+ }
+ }
+
+ private TSOpenSessionReq createOpenSessionReq() {
+ final TSOpenSessionReq req = new TSOpenSessionReq();
+ req.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ req.setUsername("root");
+ req.setPassword("root");
+ req.setZoneId(ZoneId.systemDefault().toString());
+ req.putToConfiguration("version",
IoTDBConstant.ClientVersion.V_1_0.toString());
+ req.putToConfiguration("sql_dialect", "tree");
+ return req;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index ae7ab22308a..cc85e2f4f10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -51,6 +52,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.file.Paths;
import java.time.ZoneId;
import java.util.Map;
import java.util.Objects;
@@ -249,10 +251,12 @@ public class IoTDBLegacyPipeReceiverAgent {
* @param tsFilePipeData pipeData
* @param fileDir path of file data dir
*/
- private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String
fileDir) {
- String tsFileName = tsFilePipeData.getTsFileName();
- File dir = new File(fileDir);
- File[] targetFiles =
+ private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final
String fileDir)
+ throws IOException {
+ final String tsFileName = tsFilePipeData.getTsFileName();
+ final File tsFile = resolveFileInFileDataDir(fileDir, tsFileName);
+ final File dir = tsFile.getParentFile();
+ final File[] targetFiles =
dir.listFiles((dir1, name) -> name.startsWith(tsFileName) &&
name.endsWith(PATCH_SUFFIX));
if (targetFiles != null) {
for (File targetFile : targetFiles) {
@@ -289,13 +293,21 @@ public class IoTDBLegacyPipeReceiverAgent {
LOGGER.debug(
"Invoke transportData method from client ip = {}",
identityInfo.getRemoteAddress());
- String fileDir = getFileDataDir(identityInfo);
- String fileName = metaInfo.fileName;
- long startIndex = metaInfo.startIndex;
- File file = new File(fileDir, fileName + PATCH_SUFFIX);
+ final String fileDir = getFileDataDir(identityInfo);
+ final String fileName = metaInfo.fileName;
+ final long startIndex = metaInfo.startIndex;
+ final File file;
+ final File fileWithoutPatch;
+ try {
+ fileWithoutPatch = resolveFileInFileDataDir(fileDir, fileName);
+ file = resolveFileInFileDataDir(fileDir, fileName + PATCH_SUFFIX);
+ } catch (final IOException e) {
+ LOGGER.warn(e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+ }
// step2. check startIndex
- IndexCheckResult result = checkStartIndexValid(new File(fileDir,
fileName), startIndex);
+ final IndexCheckResult result = checkStartIndexValid(fileWithoutPatch,
startIndex);
if (!result.isResult()) {
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR,
result.getIndex());
}
@@ -307,9 +319,9 @@ public class IoTDBLegacyPipeReceiverAgent {
byte[] byteArray = new byte[length];
buff.get(byteArray);
randomAccessFile.write(byteArray);
- recordStartIndex(new File(fileDir, fileName), startIndex + length);
+ recordStartIndex(fileWithoutPatch, startIndex + length);
LOGGER.debug("Sync {} start at {} to {} is done.", fileName, startIndex,
startIndex + length);
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.error(e.getMessage());
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
}
@@ -317,7 +329,21 @@ public class IoTDBLegacyPipeReceiverAgent {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
- private IndexCheckResult checkStartIndexValid(File file, long startIndex) {
+ private static File resolveFileInFileDataDir(final String fileDir, final
String fileName)
+ throws IOException {
+ if (StringUtils.isEmpty(fileName)) {
+ throw new IOException("Illegal fileName: " + fileName);
+ }
+
+ final String illegalError = FileUtils.getIllegalError4Directory(fileName);
+ if (Objects.nonNull(illegalError)) {
+ throw new IOException("Illegal fileName: " + fileName + ", " +
illegalError);
+ }
+
+ return PipeReceiverFilePathUtils.resolveFilePath(Paths.get(fileDir),
fileName).toFile();
+ }
+
+ private IndexCheckResult checkStartIndexValid(final File file, final long
startIndex) {
// get local index from memory map
long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
// get local index from file
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 4328c758d39..0a090f99b8d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -50,6 +51,9 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.session.pool.SessionPool;
@@ -64,6 +68,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -224,6 +229,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
useSSL,
trustStore,
trustStorePwd);
+ openClientSession();
final TSyncIdentityInfo identityInfo =
new TSyncIdentityInfo(
pipeName, System.currentTimeMillis(), syncConnectorVersion,
databaseName);
@@ -254,6 +260,26 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
.build();
}
+ private void openClientSession() throws TException {
+ final TSOpenSessionReq openSessionReq = new TSOpenSessionReq();
+
openSessionReq.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ openSessionReq.setUsername(user);
+ openSessionReq.setPassword(password);
+ openSessionReq.setZoneId(ZoneId.systemDefault().toString());
+ openSessionReq.putToConfiguration("version",
IoTDBConstant.ClientVersion.V_1_0.toString());
+ openSessionReq.putToConfiguration("sql_dialect", "tree");
+
+ final TSOpenSessionResp openSessionResp =
client.openSession(openSessionReq);
+ if (openSessionResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ final String errorMsg =
+ String.format(
+ "Failed to login to receiver %s:%s for legacy pipe transfer
because %s",
+ ipAddress, port, openSessionResp.getStatus().getMessage());
+ LOGGER.warn(errorMsg);
+ throw new PipeRuntimeCriticalException(errorMsg);
+ }
+ }
+
@Override
public void heartbeat() throws Exception {
// do nothing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index f8bd094b69e..3d34def409f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
import org.apache.iotdb.common.rpc.thrift.TShowConfigurationTemplateResp;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -2746,24 +2747,59 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
@Override
public TSStatus handshake(final TSyncIdentityInfo info) throws TException {
- return PipeDataNodeAgent.receiver()
- .legacy()
- .handshake(
- info,
- SESSION_MANAGER.getCurrSession().getClientAddress(),
- partitionFetcher,
- schemaFetcher);
+ try {
+ final TSStatus status = checkLegacyPipeReceiverPermission();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ return PipeDataNodeAgent.receiver()
+ .legacy()
+ .handshake(
+ info,
+ SESSION_MANAGER.getCurrSession().getClientAddress(),
+ partitionFetcher,
+ schemaFetcher);
+ } finally {
+ SESSION_MANAGER.updateIdleTime();
+ }
}
@Override
public TSStatus sendPipeData(final ByteBuffer buff) throws TException {
- return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff);
+ try {
+ final TSStatus status = checkLegacyPipeReceiverPermission();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff);
+ } finally {
+ SESSION_MANAGER.updateIdleTime();
+ }
}
@Override
public TSStatus sendFile(final TSyncTransportMetaInfo metaInfo, final
ByteBuffer buff)
throws TException {
- return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff);
+ try {
+ final TSStatus status = checkLegacyPipeReceiverPermission();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo,
buff);
+ } finally {
+ SESSION_MANAGER.updateIdleTime();
+ }
+ }
+
+ private TSStatus checkLegacyPipeReceiverPermission() {
+ final IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return getNotLoggedInStatus();
+ }
+ return AuthorityChecker.getTSStatus(
+ AuthorityChecker.checkSystemPermission(
+ clientSession.getUsername(), PrivilegeType.USE_PIPE.ordinal()),
+ PrivilegeType.USE_PIPE);
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
new file mode 100644
index 00000000000..eaf21aef1fd
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.protocol.legacy;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class IoTDBLegacyPipeReceiverAgentTest {
+
+ private static final String PIPE_NAME = "poc";
+ private static final long CREATE_TIME = 1700000000000L;
+ private static final String REMOTE_ADDRESS = "127.0.0.1";
+
+ private String originalSyncDir;
+ private Path syncDir;
+ private IoTDBLegacyPipeReceiverAgent agent;
+
+ @Before
+ public void setUp() throws Exception {
+ originalSyncDir = CommonDescriptor.getInstance().getConfig().getSyncDir();
+ syncDir = Files.createTempDirectory("legacy-pipe-receiver");
+ CommonDescriptor.getInstance().getConfig().setSyncDir(syncDir.toString());
+
+ agent = new IoTDBLegacyPipeReceiverAgent();
+ final TSStatus status =
+ agent.handshake(
+ new TSyncIdentityInfo(PIPE_NAME, CREATE_TIME, "UNKNOWN", ""),
+ REMOTE_ADDRESS,
+ null,
+ null);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (agent != null) {
+ agent.handleClientExit();
+ }
+ CommonDescriptor.getInstance().getConfig().setSyncDir(originalSyncDir);
+ if (syncDir != null) {
+ FileUtils.deleteDirectory(syncDir.toFile());
+ }
+ }
+
+ @Test
+ public void testTransportFileRejectsPathTraversal() throws Exception {
+ final String traversal =
+ ".." + File.separator + ".." + File.separator + ".." + File.separator
+ "pwned";
+
+ final TSStatus status =
+ agent.transportFile(
+ new TSyncTransportMetaInfo(traversal, 0),
+ ByteBuffer.wrap("pwned".getBytes(StandardCharsets.UTF_8)));
+
+ Assert.assertEquals(TSStatusCode.SYNC_FILE_ERROR.getStatusCode(),
status.getCode());
+ Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
+ Assert.assertFalse(Files.exists(syncDir.resolve("pwned.patch")));
+ }
+
+ @Test
+ public void testTransportFileWritesPlainFileUnderFileDataDir() throws
Exception {
+ final String fileName = "1-2-3-4.tsfile";
+ final byte[] payload = "iotdb".getBytes(StandardCharsets.UTF_8);
+
+ final TSStatus status =
+ agent.transportFile(new TSyncTransportMetaInfo(fileName, 0),
ByteBuffer.wrap(payload));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ final Path patchFile = getFileDataDir().resolve(fileName + ".patch");
+ Assert.assertArrayEquals(payload, Files.readAllBytes(patchFile));
+ }
+
+ @Test
+ public void testTransportPipeDataRejectsPathTraversalTsFileName() throws
Exception {
+ final String traversal = ".." + File.separator + "evil.tsfile";
+
+ final TSStatus status =
+ agent.transportPipeData(ByteBuffer.wrap(new TsFilePipeData("",
traversal, -1).serialize()));
+
+ Assert.assertEquals(TSStatusCode.PIPESERVER_ERROR.getStatusCode(),
status.getCode());
+ Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
+ }
+
+ private Path getFileDataDir() {
+ return syncDir
+ .resolve("receiver")
+ .resolve(String.format("%s-%d-%s", PIPE_NAME, CREATE_TIME,
REMOTE_ADDRESS))
+ .resolve("file-data");
+ }
+}