This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.7
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8fe6f510c91c33f905e9e771e963eb6678562639
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 4 14:14:58 2026 +0800

    Fixed the NPE when validating legacy sink (#17153)
    
    * npe-fix
    
    * unb
    
    (cherry picked from commit c1f4682c207dd9e45fe363d46ccae927f76fee9d)
---
 .../treemodel/auto/basic/IoTDBPipeDataSinkIT.java  | 32 ++++-------
 .../treemodel/auto/basic/IoTDBPipeSyntaxIT.java    |  3 +-
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  | 11 ++--
 .../apache/iotdb/commons/utils/WindowsOSUtils.java | 64 ++++++++++++++++++++++
 4 files changed, 83 insertions(+), 27 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
index d828a5054e3..e4363d7964d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
@@ -35,6 +35,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.sql.Connection;
+import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -209,30 +211,16 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
     final String receiverIp = receiverDataNode.getIp();
     final int receiverPort = receiverDataNode.getPort();
 
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe testPipe ('sink'='iotdb-legacy-pipe-sink', 
'ip'='%s', 'port'='%s', 'version'='1.3')",
+              receiverIp, receiverPort));
+    }
+
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final Map<String, String> extractorAttributes = new HashMap<>();
-      final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
-
-      extractorAttributes.put("source.realtime.mode", "log");
-
-      connectorAttributes.put("sink", "iotdb-legacy-pipe-sink");
-      connectorAttributes.put("sink.batch.enable", "false");
-      connectorAttributes.put("sink.ip", receiverIp);
-      connectorAttributes.put("sink.port", Integer.toString(receiverPort));
-
-      // This version does not matter since it's no longer checked by the 
legacy receiver
-      connectorAttributes.put("sink.version", "1.3");
-
-      final TSStatus status =
-          client.createPipe(
-              new TCreatePipeReq("testPipe", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
-                  .setProcessorAttributes(processorAttributes));
-
-      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
index 74c5598d616..4d739a013d3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
@@ -31,13 +31,14 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
 import 
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.external.commons.lang3.SystemUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.io.File;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index b3792bc9320..5030738cc82 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -109,7 +110,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
   private String syncConnectorVersion;
 
   private String pipeName;
-  private String databaseName;
+  private String databaseName = "";
 
   private IoTDBSyncClient client;
 
@@ -202,10 +203,12 @@ public class IoTDBLegacyPipeSink implements PipeConnector 
{
     trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
     trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
 
-    databaseName =
+    final DataRegion dataRegion =
         StorageEngine.getInstance()
-            .getDataRegion(new 
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()))
-            .getDatabaseName();
+            .getDataRegion(new 
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
+    if (Objects.nonNull(dataRegion)) {
+      databaseName = dataRegion.getDatabaseName();
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java
new file mode 100644
index 00000000000..3019c47cf0e
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.utils;
+
+import org.apache.commons.lang3.SystemUtils;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WindowsOSUtils {
+  private static final String ILLEGAL_WINDOWS_CHARS = "\\/:*?\"<>|";
+  private static final Set<String> ILLEGAL_WINDOWS_NAMES =
+      new HashSet<>(Arrays.asList("CON", "PRN", "AUX", "NUL", "COM1-COM9, 
LPT1-LPT9"));
+
+  static {
+    for (int i = 0; i < 10; ++i) {
+      ILLEGAL_WINDOWS_NAMES.add("COM" + i);
+      ILLEGAL_WINDOWS_NAMES.add("LPT" + i);
+    }
+  }
+
+  public static final String OS_SEGMENT_ERROR =
+      String.format(
+          "In Windows System, the path shall not contains %s, equals one of 
%s, or ends with '.' or ' '.",
+          ILLEGAL_WINDOWS_CHARS, ILLEGAL_WINDOWS_NAMES);
+
+  public static boolean isLegalPathSegment4Windows(final String pathSegment) {
+    if (!SystemUtils.IS_OS_WINDOWS) {
+      return true;
+    }
+    for (final char illegalChar : ILLEGAL_WINDOWS_CHARS.toCharArray()) {
+      if (pathSegment.indexOf(illegalChar) != -1) {
+        return false;
+      }
+    }
+    if (pathSegment.endsWith(".") || pathSegment.endsWith(" ")) {
+      return false;
+    }
+    for (final String illegalName : ILLEGAL_WINDOWS_NAMES) {
+      if (pathSegment.equalsIgnoreCase(illegalName)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

Reply via email to