This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.3.7 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a48faf34ed2194e63a49de12ba82433ced436ba4 Author: Caideyipi <[email protected]> AuthorDate: Wed Feb 4 11:44:09 2026 +0800 [To dev/1.3] Pipe: Banned the illegal names in pipe and pipe plugins (#17145) (#17156) * Pipe: Banned the illegal names in pipe and pipe plugins (#17145) * fix * fix * fix * sp-linux * fix * fix --- .../pipe/it/autocreate/IoTDBPipeSyntaxIT.java | 64 ++++++++++++++++++++++ .../legacy/IoTDBLegacyPipeReceiverAgent.java | 5 +- .../config/executor/ClusterConfigTaskExecutor.java | 49 ++++++++++++----- .../org/apache/iotdb/commons/utils/FileUtils.java | 12 ++++ .../apache/iotdb/commons/utils/WindowsOSUtils.java | 64 ++++++++++++++++++++++ 5 files changed, 179 insertions(+), 15 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java index 26cb3e9e733..4818378c2f5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java @@ -29,6 +29,7 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.commons.lang3.SystemUtils; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -38,6 +39,7 @@ import java.io.File; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -316,6 +318,68 @@ public class IoTDBPipeSyntaxIT extends AbstractPipeDualAutoIT { } } + @Test + public void testDirectoryErrors() throws SQLException { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + List<String> wrongDirs = Arrays.asList(".", "..", "./hackYou", ".\\hackYouTwice"); + if (SystemUtils.IS_OS_WINDOWS) { + wrongDirs = new ArrayList<>(wrongDirs); + wrongDirs.add("BombWindows/:*?"); + wrongDirs.add("AUX"); + } + for (final String name : wrongDirs) { + testDirectoryError(name, statement); + } + } + } + + private void testDirectoryError(final String wrongDir, final Statement statement) { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try { + statement.execute( + String.format( + "create pipe `" + + wrongDir + + "` with source ()" + + " with processor ()" + + " with sink (" + + "'sink'='invalid-param'," + + "'sink.ip'='%s'," + + "'sink.port'='%s'," + + "'sink.batch.enable'='false')", + receiverIp, + receiverPort)); + fail(); + } catch (final Exception ignore) { + // Expected + } + + try { + statement.execute( + String.format( + "create pipePlugin `" + + wrongDir + + "` as 'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI '%s'", + new File( + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator) + .toURI() + + "PipePlugin.jar")); + fail(); + } catch (final SQLException e) { + Assert.assertTrue(e.getMessage().contains("1600: Failed to create pipe plugin")); + } + } + @Test public void testBrackets() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java index dc4f6192d52..32eade04262 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.sink.payload.legacy.PipeData; @@ -52,6 +53,7 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -124,7 +126,8 @@ public class IoTDBLegacyPipeReceiverAgent { } private boolean validatePipeName(final TSyncIdentityInfo info) { - return info.isSetPipeName() && !info.getPipeName().contains(File.separator); + return info.isSetPipeName() + && Objects.isNull(FileUtils.getIllegalError4Directory(info.getPipeName())); } private void createConnection(final SyncIdentityInfo identityInfo) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index b711b652829..74fd8421cef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -62,6 +62,7 @@ import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager; import org.apache.iotdb.commons.udf.service.UDFClassLoader; import org.apache.iotdb.commons.udf.service.UDFExecutableManager; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; @@ -290,6 +291,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; @@ -813,6 +815,15 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final String className = createPipePluginStatement.getClassName(); final String uriString = createPipePluginStatement.getUriString(); + final String pathError = FileUtils.getIllegalError4Directory(pluginName); + if (Objects.nonNull(pathError)) { + future.setException( + new IoTDBException( + String.format("Failed to create pipe plugin %s. " + pathError, pluginName), + TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())); + return future; + } + if (uriString == null || uriString.isEmpty()) { future.setException( new IoTDBException( @@ -1774,18 +1785,28 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } @Override - public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPipeStatement) { - SettableFuture<ConfigTaskResult> future = SettableFuture.create(); + public SettableFuture<ConfigTaskResult> createPipe( + final CreatePipeStatement createPipeStatement) { + final SettableFuture<ConfigTaskResult> future = SettableFuture.create(); - // Validate pipe name - if (createPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) { - String exceptionMessage = - String.format( - "Failed to create pipe %s, pipe name starting with \"%s\" are not allowed to be created.", - createPipeStatement.getPipeName(), PipeStaticMeta.SYSTEM_PIPE_PREFIX); - LOGGER.warn(exceptionMessage); + // Verify that Pipe is disabled if TSFile encryption is enabled + final String pipeName = createPipeStatement.getPipeName(); + if (pipeName.startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) { future.setException( - new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode())); + new IoTDBException( + String.format( + "Failed to create pipe %s, pipe name starting with \"%s\" are not allowed to be created.", + pipeName, PipeStaticMeta.SYSTEM_PIPE_PREFIX), + TSStatusCode.PIPE_ERROR.getStatusCode())); + return future; + } + + final String pathError = FileUtils.getIllegalError4Directory(pipeName); + if (Objects.nonNull(pathError)) { + future.setException( + new IoTDBException( + String.format("Failed to create pipe %s, " + pathError, pipeName), + TSStatusCode.PIPE_ERROR.getStatusCode())); return future; } @@ -1793,7 +1814,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { try { PipeDataNodeAgent.plugin() .validate( - createPipeStatement.getPipeName(), + pipeName, createPipeStatement.getExtractorAttributes(), createPipeStatement.getProcessorAttributes(), createPipeStatement.getConnectorAttributes()); @@ -1816,7 +1837,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final TCreatePipeReq realtimeReq = new TCreatePipeReq() // Append suffix to the pipeline name for real-time data - .setPipeName(createPipeStatement.getPipeName() + "_realtime") + .setPipeName(pipeName + "_realtime") // NOTE: set if not exists always to true to handle partial failure .setIfNotExistsCondition(true) // Use extractor parameters for real-time data @@ -1844,7 +1865,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { final TCreatePipeReq historyReq = new TCreatePipeReq() // Append suffix to the pipeline name for historical data - .setPipeName(createPipeStatement.getPipeName() + "_history") + .setPipeName(pipeName + "_history") .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) // Use source parameters for historical data .setExtractorAttributes( @@ -1887,7 +1908,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TCreatePipeReq req = new TCreatePipeReq() - .setPipeName(createPipeStatement.getPipeName()) + .setPipeName(pipeName) .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) .setExtractorAttributes(createPipeStatement.getExtractorAttributes()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java index fe9ac7c733c..7c68d7d376f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java @@ -54,6 +54,8 @@ public class FileUtils { "Renamed file {} to {} because it already exists in the target directory: {}"; private static final String COPY_FILE_MESSAGE = "Copy file {} to {} because it already exists in the target directory: {}"; + private static final String ILLEGAL_PATH_MESSAGE = + "The path cannot be '.', '..', './' or '.\\'. "; private FileUtils() {} @@ -552,4 +554,14 @@ public class FileUtils { targetFile, targetFile.getParentFile().getAbsolutePath()); } + + public static String getIllegalError4Directory(final String path) { + if (path.equals(".") || path.equals("..") || path.contains("./") || path.contains(".\\")) { + return ILLEGAL_PATH_MESSAGE; + } + if (!WindowsOSUtils.isLegalPathSegment4Windows(path)) { + return WindowsOSUtils.OS_SEGMENT_ERROR; + } + return null; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java new file mode 100644 index 00000000000..3019c47cf0e --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils; + +import org.apache.commons.lang3.SystemUtils; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +public class WindowsOSUtils { + private static final String ILLEGAL_WINDOWS_CHARS = "\\/:*?\"<>|"; + private static final Set<String> ILLEGAL_WINDOWS_NAMES = + new HashSet<>(Arrays.asList("CON", "PRN", "AUX", "NUL", "COM1-COM9, LPT1-LPT9")); + + static { + for (int i = 0; i < 10; ++i) { + ILLEGAL_WINDOWS_NAMES.add("COM" + i); + ILLEGAL_WINDOWS_NAMES.add("LPT" + i); + } + } + + public static final String OS_SEGMENT_ERROR = + String.format( + "In Windows System, the path shall not contains %s, equals one of %s, or ends with '.' or ' '.", + ILLEGAL_WINDOWS_CHARS, ILLEGAL_WINDOWS_NAMES); + + public static boolean isLegalPathSegment4Windows(final String pathSegment) { + if (!SystemUtils.IS_OS_WINDOWS) { + return true; + } + for (final char illegalChar : ILLEGAL_WINDOWS_CHARS.toCharArray()) { + if (pathSegment.indexOf(illegalChar) != -1) { + return false; + } + } + if (pathSegment.endsWith(".") || pathSegment.endsWith(" ")) { + return false; + } + for (final String illegalName : ILLEGAL_WINDOWS_NAMES) { + if (pathSegment.equalsIgnoreCase(illegalName)) { + return false; + } + } + return true; + } +}
