This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.7 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5d33826dc4375f13b391466ce9753964618afdd3 Author: Caideyipi <[email protected]> AuthorDate: Wed Feb 4 09:38:44 2026 +0800 Pipe: Banned the illegal names in pipe and pipe plugins (#17145) * fix * fix * fix * sp-linux (cherry picked from commit 52d7245a2b849c80bb424bb8fc6b67ad3ced2311) --- .../treemodel/auto/basic/IoTDBPipeSyntaxIT.java | 64 ++++++++++++++++++++++ .../legacy/IoTDBLegacyPipeReceiverAgent.java | 5 +- .../config/executor/ClusterConfigTaskExecutor.java | 34 +++++++++--- .../org/apache/iotdb/commons/utils/FileUtils.java | 12 ++++ 4 files changed, 107 insertions(+), 8 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java index e925e9e4428..74c5598d616 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java @@ -31,6 +31,7 @@ import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.external.commons.lang3.SystemUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -40,6 +41,7 @@ import org.junit.runner.RunWith; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -335,6 +337,68 @@ public class IoTDBPipeSyntaxIT extends AbstractPipeDualTreeModelAutoIT { } } + @Test + public void testDirectoryErrors() throws SQLException { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + List<String> wrongDirs = Arrays.asList(".", "..", "./hackYou", ".\\hackYouTwice"); + if (SystemUtils.IS_OS_WINDOWS) { + wrongDirs = new ArrayList<>(wrongDirs); + wrongDirs.add("BombWindows/:*?"); + wrongDirs.add("AUX"); + } + for (final String name : wrongDirs) { + testDirectoryError(name, statement); + } + } + } + + private void testDirectoryError(final String wrongDir, final Statement statement) { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try { + statement.execute( + String.format( + "create pipe `" + + wrongDir + + "` with source ()" + + " with processor ()" + + " with sink (" + + "'sink'='invalid-param'," + + "'sink.ip'='%s'," + + "'sink.port'='%s'," + + "'sink.batch.enable'='false')", + receiverIp, + receiverPort)); + fail(); + } catch (final Exception ignore) { + // Expected + } + + try { + statement.execute( + String.format( + "create pipePlugin `" + + wrongDir + + "` as 'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI '%s'", + new File( + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator) + .toURI() + + "PipePlugin.jar")); + fail(); + } catch (final SQLException e) { + Assert.assertTrue(e.getMessage().contains("1600: Failed to create pipe plugin")); + } + } + @Test public void testBrackets() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java index 759a65cffa7..6ce802c3143 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.sink.payload.legacy.PipeData; @@ -53,6 +54,7 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -125,7 +127,8 @@ public class IoTDBLegacyPipeReceiverAgent { } private boolean validatePipeName(final TSyncIdentityInfo info) { - return info.isSetPipeName() && !info.getPipeName().contains(File.separator); + return info.isSetPipeName() + && Objects.isNull(FileUtils.getIllegalError4Directory(info.getPipeName())); } private void createConnection(final SyncIdentityInfo identityInfo) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index ca02ee1ec13..ef2ee19dc37 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -74,6 +74,7 @@ import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager; import org.apache.iotdb.commons.udf.service.UDFClassLoader; import org.apache.iotdb.commons.udf.service.UDFExecutableManager; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.rpc.thrift.TAINodeRemoveReq; @@ -944,6 +945,15 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final String className = createPipePluginStatement.getClassName(); final String uriString = createPipePluginStatement.getUriString(); + final String pathError = FileUtils.getIllegalError4Directory(pluginName); + if (Objects.nonNull(pathError)) { + future.setException( + new IoTDBException( + String.format("Failed to create pipe plugin %s. " + pathError, pluginName), + TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())); + return future; + } + if (uriString == null || uriString.isEmpty()) { future.setException( new IoTDBException( @@ -2065,6 +2075,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final SettableFuture<ConfigTaskResult> future = SettableFuture.create(); // Verify that Pipe is disabled if TSFile encryption is enabled + final String pipeName = createPipeStatement.getPipeName(); if (!Objects.equals(TSFileDescriptor.getInstance().getConfig().getEncryptType(), "UNENCRYPTED") && !Objects.equals( TSFileDescriptor.getInstance().getConfig().getEncryptType(), @@ -2073,18 +2084,27 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { new IoTDBException( String.format( "Failed to create Pipe %s because TSFile is configured with encryption, which prohibits the use of Pipe", - createPipeStatement.getPipeName()), + pipeName), TSStatusCode.PIPE_ERROR.getStatusCode())); return future; } // Validate pipe name - if (createPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) { + if (pipeName.startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) { future.setException( new IoTDBException( String.format( "Failed to create pipe %s, pipe name starting with \"%s\" are not allowed to be created.", - createPipeStatement.getPipeName(), PipeStaticMeta.SYSTEM_PIPE_PREFIX), + pipeName, PipeStaticMeta.SYSTEM_PIPE_PREFIX), + TSStatusCode.PIPE_ERROR.getStatusCode())); + return future; + } + + final String pathError = FileUtils.getIllegalError4Directory(pipeName); + if (Objects.nonNull(pathError)) { + future.setException( + new IoTDBException( + String.format("Failed to create pipe %s, " + pathError, pipeName), TSStatusCode.PIPE_ERROR.getStatusCode())); return future; } @@ -2093,7 +2113,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { try { PipeDataNodeAgent.plugin() .validate( - createPipeStatement.getPipeName(), + pipeName, createPipeStatement.getExtractorAttributes(), createPipeStatement.getProcessorAttributes(), createPipeStatement.getConnectorAttributes()); @@ -2115,7 +2135,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final TCreatePipeReq realtimeReq = new TCreatePipeReq() // Append suffix to the pipeline name for real-time data - .setPipeName(createPipeStatement.getPipeName() + "_realtime") + .setPipeName(pipeName + "_realtime") // NOTE: set if not exists always to true to handle partial failure .setIfNotExistsCondition(true) // Use extractor parameters for real-time data @@ -2143,7 +2163,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final TCreatePipeReq historyReq = new TCreatePipeReq() // Append suffix to the pipeline name for historical data - .setPipeName(createPipeStatement.getPipeName() + "_history") + .setPipeName(pipeName + "_history") .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) // Use source parameters for historical data .setExtractorAttributes( @@ -2186,7 +2206,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TCreatePipeReq req = new TCreatePipeReq() - .setPipeName(createPipeStatement.getPipeName()) + .setPipeName(pipeName) .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) .setExtractorAttributes(createPipeStatement.getExtractorAttributes()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java index a0eb45f0bd6..2e1d3d16a00 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java @@ -57,6 +57,8 @@ public class FileUtils { "Renamed file {} to {} because it already exists in the target directory: {}"; private static final String COPY_FILE_MESSAGE = "Copy file {} to {} because it already exists in the target directory: {}"; + private static final String ILLEGAL_PATH_MESSAGE = + "The path cannot be '.', '..', './' or '.\\'. "; private FileUtils() {} @@ -564,4 +566,14 @@ public class FileUtils { targetFile, targetFile.getParentFile().getAbsolutePath()); } + + public static String getIllegalError4Directory(final String path) { + if (path.equals(".") || path.equals("..") || path.contains("./") || path.contains(".\\")) { + return ILLEGAL_PATH_MESSAGE; + } + if (!WindowsOSUtils.isLegalPathSegment4Windows(path)) { + return WindowsOSUtils.OS_SEGMENT_ERROR; + } + return null; + } }
