This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.7 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8fe6f510c91c33f905e9e771e963eb6678562639 Author: Caideyipi <[email protected]> AuthorDate: Wed Feb 4 14:14:58 2026 +0800 Fixed the NPE when validating legacy sink (#17153) * npe-fix * unb (cherry picked from commit c1f4682c207dd9e45fe363d46ccae927f76fee9d) --- .../treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 32 ++++------- .../treemodel/auto/basic/IoTDBPipeSyntaxIT.java | 3 +- .../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 11 ++-- .../apache/iotdb/commons/utils/WindowsOSUtils.java | 64 ++++++++++++++++++++++ 4 files changed, 83 insertions(+), 27 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index d828a5054e3..e4363d7964d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -35,6 +35,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.sql.Connection; +import java.sql.Statement; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -209,30 +211,16 @@ public class IoTDBPipeDataSinkIT extends AbstractPipeDualTreeModelAutoIT { final String receiverIp = receiverDataNode.getIp(); final int receiverPort = receiverDataNode.getPort(); + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe testPipe ('sink'='iotdb-legacy-pipe-sink', 'ip'='%s', 'port'='%s', 'version'='1.3')", + receiverIp, receiverPort)); + } + try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - final Map<String, String> extractorAttributes = new HashMap<>(); - final Map<String, String> processorAttributes = new HashMap<>(); - final Map<String, String> connectorAttributes = new HashMap<>(); - - extractorAttributes.put("source.realtime.mode", "log"); - - connectorAttributes.put("sink", "iotdb-legacy-pipe-sink"); - connectorAttributes.put("sink.batch.enable", "false"); - connectorAttributes.put("sink.ip", receiverIp); - connectorAttributes.put("sink.port", Integer.toString(receiverPort)); - - // This version does not matter since it's no longer checked by the legacy receiver - connectorAttributes.put("sink.version", "1.3"); - - final TSStatus status = - client.createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(extractorAttributes) - .setProcessorAttributes(processorAttributes)); - - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java index 74c5598d616..4d739a013d3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java @@ -31,13 +31,14 @@ import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.external.commons.lang3.SystemUtils; +import org.apache.commons.lang3.SystemUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.io.File; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index b3792bc9320..5030738cc82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -109,7 +110,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector { private String syncConnectorVersion; private String pipeName; - private String databaseName; + private String databaseName = ""; private IoTDBSyncClient client; @@ -202,10 +203,12 @@ public class IoTDBLegacyPipeSink implements PipeConnector { trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY); - databaseName = + final DataRegion dataRegion = StorageEngine.getInstance() - .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId())) - .getDatabaseName(); + .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId())); + if (Objects.nonNull(dataRegion)) { + databaseName = dataRegion.getDatabaseName(); + } } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java new file mode 100644 index 00000000000..3019c47cf0e --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/WindowsOSUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils; + +import org.apache.commons.lang3.SystemUtils; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +public class WindowsOSUtils { + private static final String ILLEGAL_WINDOWS_CHARS = "\\/:*?\"<>|"; + private static final Set<String> ILLEGAL_WINDOWS_NAMES = + new HashSet<>(Arrays.asList("CON", "PRN", "AUX", "NUL", "COM1-COM9, LPT1-LPT9")); + + static { + for (int i = 0; i < 10; ++i) { + ILLEGAL_WINDOWS_NAMES.add("COM" + i); + ILLEGAL_WINDOWS_NAMES.add("LPT" + i); + } + } + + public static final String OS_SEGMENT_ERROR = + String.format( + "In Windows System, the path shall not contains %s, equals one of %s, or ends with '.' or ' '.", + ILLEGAL_WINDOWS_CHARS, ILLEGAL_WINDOWS_NAMES); + + public static boolean isLegalPathSegment4Windows(final String pathSegment) { + if (!SystemUtils.IS_OS_WINDOWS) { + return true; + } + for (final char illegalChar : ILLEGAL_WINDOWS_CHARS.toCharArray()) { + if (pathSegment.indexOf(illegalChar) != -1) { + return false; + } + } + if (pathSegment.endsWith(".") || pathSegment.endsWith(" ")) { + return false; + } + for (final String illegalName : ILLEGAL_WINDOWS_NAMES) { + if (pathSegment.equalsIgnoreCase(illegalName)) { + return false; + } + } + return true; + } +}
