This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f6e3a12eab9 [To dev/1.3] Pipe: Banned the illegal names in pipe and 
pipe plugins (#17145) (#17156)
f6e3a12eab9 is described below

commit f6e3a12eab9a2f3091889b8b026817fe4bdc5070
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 4 11:44:09 2026 +0800

    [To dev/1.3] Pipe: Banned the illegal names in pipe and pipe plugins 
(#17145) (#17156)
    
    * Pipe: Banned the illegal names in pipe and pipe plugins (#17145)
    
    * fix
    
    * fix
    
    * fix
    
    * sp-linux
    
    * fix
    
    * fix
---
 .../pipe/it/autocreate/IoTDBPipeSyntaxIT.java      | 64 ++++++++++++++++++++++
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |  5 +-
 .../config/executor/ClusterConfigTaskExecutor.java | 49 ++++++++++++-----
 .../org/apache/iotdb/commons/utils/FileUtils.java  | 12 ++++
 .../apache/iotdb/commons/utils/WindowsOSUtils.java | 64 ++++++++++++++++++++++
 5 files changed, 179 insertions(+), 15 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
index 26cb3e9e733..4818378c2f5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.lang3.SystemUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -38,6 +39,7 @@ import java.io.File;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -316,6 +318,68 @@ public class IoTDBPipeSyntaxIT extends 
AbstractPipeDualAutoIT {
     }
   }
 
+  @Test
+  public void testDirectoryErrors() throws SQLException {
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      List<String> wrongDirs = Arrays.asList(".", "..", "./hackYou", 
".\\hackYouTwice");
+      if (SystemUtils.IS_OS_WINDOWS) {
+        wrongDirs = new ArrayList<>(wrongDirs);
+        wrongDirs.add("BombWindows/:*?");
+        wrongDirs.add("AUX");
+      }
+      for (final String name : wrongDirs) {
+        testDirectoryError(name, statement);
+      }
+    }
+  }
+
+  private void testDirectoryError(final String wrongDir, final Statement 
statement) {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try {
+      statement.execute(
+          String.format(
+              "create pipe `"
+                  + wrongDir
+                  + "` with source ()"
+                  + " with processor ()"
+                  + " with sink ("
+                  + "'sink'='invalid-param',"
+                  + "'sink.ip'='%s',"
+                  + "'sink.port'='%s',"
+                  + "'sink.batch.enable'='false')",
+              receiverIp,
+              receiverPort));
+      fail();
+    } catch (final Exception ignore) {
+      // Expected
+    }
+
+    try {
+      statement.execute(
+          String.format(
+              "create pipePlugin `"
+                  + wrongDir
+                  + "` as 'org.apache.iotdb.db.pipe.example.TestProcessor' 
USING URI '%s'",
+              new File(
+                          System.getProperty("user.dir")
+                              + File.separator
+                              + "target"
+                              + File.separator
+                              + "test-classes"
+                              + File.separator)
+                      .toURI()
+                  + "PipePlugin.jar"));
+      fail();
+    } catch (final SQLException e) {
+      Assert.assertTrue(e.getMessage().contains("1600: Failed to create pipe 
plugin"));
+    }
+  }
+
   @Test
   public void testBrackets() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index dc4f6192d52..32eade04262 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.sink.payload.legacy.PipeData;
@@ -52,6 +53,7 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -124,7 +126,8 @@ public class IoTDBLegacyPipeReceiverAgent {
   }
 
   private boolean validatePipeName(final TSyncIdentityInfo info) {
-    return info.isSetPipeName() && 
!info.getPipeName().contains(File.separator);
+    return info.isSetPipeName()
+        && 
Objects.isNull(FileUtils.getIllegalError4Directory(info.getPipeName()));
   }
 
   private void createConnection(final SyncIdentityInfo identityInfo) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d4ba79d2c5d..784e890b1e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -62,6 +62,7 @@ import 
org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFClassLoader;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
@@ -290,6 +291,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -813,6 +815,15 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     final String className = createPipePluginStatement.getClassName();
     final String uriString = createPipePluginStatement.getUriString();
 
+    final String pathError = FileUtils.getIllegalError4Directory(pluginName);
+    if (Objects.nonNull(pathError)) {
+      future.setException(
+          new IoTDBException(
+              String.format("Failed to create pipe plugin %s. " + pathError, 
pluginName),
+              TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode()));
+      return future;
+    }
+
     if (uriString == null || uriString.isEmpty()) {
       future.setException(
           new IoTDBException(
@@ -1774,18 +1785,28 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement 
createPipeStatement) {
-    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+  public SettableFuture<ConfigTaskResult> createPipe(
+      final CreatePipeStatement createPipeStatement) {
+    final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
 
-    // Validate pipe name
-    if 
(createPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX))
 {
-      String exceptionMessage =
-          String.format(
-              "Failed to create pipe %s, pipe name starting with \"%s\" are 
not allowed to be created.",
-              createPipeStatement.getPipeName(), 
PipeStaticMeta.SYSTEM_PIPE_PREFIX);
-      LOGGER.warn(exceptionMessage);
+    // Verify that Pipe is disabled if TSFile encryption is enabled
+    final String pipeName = createPipeStatement.getPipeName();
+    if (pipeName.startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) {
       future.setException(
-          new IoTDBException(exceptionMessage, 
TSStatusCode.PIPE_ERROR.getStatusCode()));
+          new IoTDBException(
+              String.format(
+                  "Failed to create pipe %s, pipe name starting with \"%s\" 
are not allowed to be created.",
+                  pipeName, PipeStaticMeta.SYSTEM_PIPE_PREFIX),
+              TSStatusCode.PIPE_ERROR.getStatusCode()));
+      return future;
+    }
+
+    final String pathError = FileUtils.getIllegalError4Directory(pipeName);
+    if (Objects.nonNull(pathError)) {
+      future.setException(
+          new IoTDBException(
+              String.format("Failed to create pipe %s, " + pathError, 
pipeName),
+              TSStatusCode.PIPE_ERROR.getStatusCode()));
       return future;
     }
 
@@ -1793,7 +1814,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     try {
       PipeDataNodeAgent.plugin()
           .validate(
-              createPipeStatement.getPipeName(),
+              pipeName,
               createPipeStatement.getExtractorAttributes(),
               createPipeStatement.getProcessorAttributes(),
               createPipeStatement.getConnectorAttributes());
@@ -1816,7 +1837,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         final TCreatePipeReq realtimeReq =
             new TCreatePipeReq()
                 // Append suffix to the pipeline name for real-time data
-                .setPipeName(createPipeStatement.getPipeName() + "_realtime")
+                .setPipeName(pipeName + "_realtime")
                 // NOTE: set if not exists always to true to handle partial 
failure
                 .setIfNotExistsCondition(true)
                 // Use extractor parameters for real-time data
@@ -1844,7 +1865,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         final TCreatePipeReq historyReq =
             new TCreatePipeReq()
                 // Append suffix to the pipeline name for historical data
-                .setPipeName(createPipeStatement.getPipeName() + "_history")
+                .setPipeName(pipeName + "_history")
                 
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
                 // Use source parameters for historical data
                 .setExtractorAttributes(
@@ -1887,7 +1908,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       TCreatePipeReq req =
           new TCreatePipeReq()
-              .setPipeName(createPipeStatement.getPipeName())
+              .setPipeName(pipeName)
               
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
               
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
               
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index fe9ac7c733c..7c68d7d376f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -54,6 +54,8 @@ public class FileUtils {
       "Renamed file {} to {} because it already exists in the target 
directory: {}";
   private static final String COPY_FILE_MESSAGE =
       "Copy file {} to {} because it already exists in the target directory: 
{}";
+  private static final String ILLEGAL_PATH_MESSAGE =
+      "The path cannot be '.', '..', './' or '.\\'. ";
 
   private FileUtils() {}
 
@@ -552,4 +554,14 @@ public class FileUtils {
         targetFile,
         targetFile.getParentFile().getAbsolutePath());
   }
+
+  public static String getIllegalError4Directory(final String path) {
+    if (path.equals(".") || path.equals("..") || path.contains("./") || 
path.contains(".\\")) {
+      return ILLEGAL_PATH_MESSAGE;
+    }
+    if (!WindowsOSUtils.isLegalPathSegment4Windows(path)) {
+      return WindowsOSUtils.OS_SEGMENT_ERROR;
+    }
+    return null;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java
new file mode 100644
index 00000000000..3019c47cf0e
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.utils;
+
+import org.apache.commons.lang3.SystemUtils;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WindowsOSUtils {
+  private static final String ILLEGAL_WINDOWS_CHARS = "\\/:*?\"<>|";
+  private static final Set<String> ILLEGAL_WINDOWS_NAMES =
+      new HashSet<>(Arrays.asList("CON", "PRN", "AUX", "NUL", "COM1-COM9, 
LPT1-LPT9"));
+
+  static {
+    for (int i = 0; i < 10; ++i) {
+      ILLEGAL_WINDOWS_NAMES.add("COM" + i);
+      ILLEGAL_WINDOWS_NAMES.add("LPT" + i);
+    }
+  }
+
+  public static final String OS_SEGMENT_ERROR =
+      String.format(
+          "In Windows System, the path shall not contains %s, equals one of 
%s, or ends with '.' or ' '.",
+          ILLEGAL_WINDOWS_CHARS, ILLEGAL_WINDOWS_NAMES);
+
+  public static boolean isLegalPathSegment4Windows(final String pathSegment) {
+    if (!SystemUtils.IS_OS_WINDOWS) {
+      return true;
+    }
+    for (final char illegalChar : ILLEGAL_WINDOWS_CHARS.toCharArray()) {
+      if (pathSegment.indexOf(illegalChar) != -1) {
+        return false;
+      }
+    }
+    if (pathSegment.endsWith(".") || pathSegment.endsWith(" ")) {
+      return false;
+    }
+    for (final String illegalName : ILLEGAL_WINDOWS_NAMES) {
+      if (pathSegment.equalsIgnoreCase(illegalName)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

Reply via email to