This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.3.7 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 59f911b0498b9961b94f0e585c99ebc8dd155b39 Author: Caideyipi <[email protected]> AuthorDate: Wed Feb 4 16:20:55 2026 +0800 Fixed the NPE when validating legacy sink (#17153) (#17161) * npe-fix * unb --- .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 33 ++++++++-------------- .../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 11 +++++--- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index 09fd6ba6d4e..d59201f5605 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; @@ -33,6 +34,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.sql.Connection; +import java.sql.Statement; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -200,30 +203,16 @@ public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT { final String receiverIp = receiverDataNode.getIp(); final int receiverPort = receiverDataNode.getPort(); + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe testPipe ('sink'='iotdb-legacy-pipe-sink', 'ip'='%s', 'port'='%s', 'version'='1.3')", + receiverIp, receiverPort)); + } + try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - final Map<String, String> extractorAttributes = new HashMap<>(); - final Map<String, String> processorAttributes = new HashMap<>(); - final Map<String, String> connectorAttributes = new HashMap<>(); - - extractorAttributes.put("source.realtime.mode", "log"); - - connectorAttributes.put("sink", "iotdb-legacy-pipe-sink"); - connectorAttributes.put("sink.batch.enable", "false"); - connectorAttributes.put("sink.ip", receiverIp); - connectorAttributes.put("sink.port", Integer.toString(receiverPort)); - - // This version does not matter since it's no longer checked by the legacy receiver - connectorAttributes.put("sink.version", "1.3"); - - final TSStatus status = - client.createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(extractorAttributes) - .setProcessorAttributes(processorAttributes)); - - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index a80293c9467..406c5cb4fdc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -107,7 +108,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector { private String syncConnectorVersion; private String pipeName; - private String databaseName; + private String databaseName = ""; private IoTDBSyncClient client; private SessionPool sessionPool; @@ -199,10 +200,12 @@ public class IoTDBLegacyPipeSink implements PipeConnector { trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY); - databaseName = + final DataRegion dataRegion = StorageEngine.getInstance() - .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId())) - .getDatabaseName(); + .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId())); + if (Objects.nonNull(dataRegion)) { + databaseName = dataRegion.getDatabaseName(); + } } @Override
