This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b46c58b324d Fix pipe tsfile receiver database handling (#17815)
b46c58b324d is described below
commit b46c58b324db40c4a93428bc30b35bc326ff3479
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 16:26:42 2026 +0800
Fix pipe tsfile receiver database handling (#17815)
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 62 +++++++++---
.../visitor/PipeStatementExceptionVisitor.java | 7 ++
.../plan/statement/crud/LoadTsFileStatement.java | 25 +++++
.../receiver/PipeStatementTsStatusVisitorTest.java | 14 +++
.../protocol/thrift/IoTDBDataNodeReceiverTest.java | 110 +++++++++++++++++++++
5 files changed, 206 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 55f46be8f99..37b277efbaf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -59,6 +59,7 @@ import
org.apache.iotdb.db.pipe.receiver.visitor.PipeTableStatementDataTypeConve
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementDataTypeConvertExecutionVisitor;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
@@ -568,12 +569,10 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus loadTsFileAsync(final String dataBaseName, final
List<String> absolutePaths)
throws IOException {
final Map<String, String> loadAttributes =
- ActiveLoadPathHelper.buildAttributes(
+ buildLoadTsFileAttributesForAsync(
dataBaseName,
- null,
shouldConvertDataTypeOnTypeMismatch,
validateTsFile.get(),
- null,
shouldMarkAsPipeRequest.get());
if (!LoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) {
@@ -582,17 +581,38 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ static Map<String, String> buildLoadTsFileAttributesForAsync(
+ final String dataBaseName,
+ final boolean shouldConvertDataTypeOnTypeMismatch,
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
+ return ActiveLoadPathHelper.buildAttributes(
+ dataBaseName,
+ LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
+ shouldConvertDataTypeOnTypeMismatch,
+ validateTsFile,
+ null,
+ shouldMarkAsPipeRequest);
+ }
+
private TSStatus loadTsFileSync(final String dataBaseName, final String
fileAbsolutePath)
throws FileNotFoundException {
+ return executeStatementAndClassifyExceptions(
+ buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath,
validateTsFile.get()));
+ }
+
+ static LoadTsFileStatement buildLoadTsFileStatementForSync(
+ final String dataBaseName, final String fileAbsolutePath, final boolean
validateTsFile)
+ throws FileNotFoundException {
final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
- statement.setVerifySchema(validateTsFile.get());
+ statement.setVerifySchema(validateTsFile);
statement.setAutoCreateDatabase(
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
statement.setDatabase(dataBaseName);
-
- return executeStatementAndClassifyExceptions(statement);
+ statement.updateDatabaseLevelByTreeDatabase();
+ return statement;
}
private TSStatus loadSchemaSnapShot(
@@ -845,12 +865,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return STATEMENT_STATUS_VISITOR.process(statement, result);
}
} catch (final Exception e) {
- PipeLogger.log(
- LOGGER::warn,
- e,
- "Receiver id = %s: Exception encountered while executing statement
%s: ",
- receiverId.get(),
- statement.getPipeLoggingString());
+ logStatementExceptionIfNecessary(statement, e);
return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
} finally {
if (Objects.nonNull(allocatedMemoryBlock)) {
@@ -860,6 +875,29 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
}
+ private void logStatementExceptionIfNecessary(final Statement statement,
final Exception e) {
+ if (shouldLogStatementException(receiverId.get(), statement, e)) {
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Receiver id = %s: Exception encountered while executing statement
%s: ",
+ receiverId.get(),
+ Objects.isNull(statement) ? null : statement.getPipeLoggingString());
+ }
+ }
+
+ static boolean shouldLogStatementException(
+ final long receiverId, final Statement statement, final Exception e) {
+ // Use the reducer cache as a gate. The actual stack trace is logged only
when it passes.
+ return PipePeriodicalLogReducer.log(
+ message -> {},
+ "Receiver id = %s, statement = %s, exception = %s, message = %s",
+ receiverId,
+ Objects.isNull(statement) ? null : statement.getPipeLoggingString(),
+ e.getClass().getName(),
+ e.getMessage());
+ }
+
private TSStatus
executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
final Statement statement) {
if (statement == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 42b1f0e5b1e..8e590c5847c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.SemanticException;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
@@ -52,6 +53,12 @@ public class PipeStatementExceptionVisitor extends
StatementVisitor<TSStatus, Ex
if (context instanceof AccessDeniedException) {
return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode())
.setMessage(context.getMessage());
+ } else if (context instanceof IoTDBRuntimeException
+ && ((IoTDBRuntimeException) context).getErrorCode()
+ == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
}
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
.setMessage(context.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 59c7f9a57ef..73133611ed2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.statement.crud;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -44,6 +46,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ASYNC_LOAD_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
@@ -337,6 +340,28 @@ public class LoadTsFileStatement extends Statement {
}
}
+ public void updateDatabaseLevelByTreeDatabase() {
+ final Integer databaseLevel = getDatabaseLevelByTreeDatabase(database);
+ if (databaseLevel != null) {
+ this.databaseLevel = databaseLevel;
+ }
+ }
+
+ public static Integer getDatabaseLevelByTreeDatabase(final String database) {
+ if (database == null) {
+ return null;
+ }
+ try {
+ final String[] nodes = PathUtils.splitPathToDetachedNodes(database);
+ if (nodes.length > 1 && PATH_ROOT.equals(nodes[0])) {
+ return nodes.length - 1;
+ }
+ } catch (final IllegalPathException ignored) {
+ // Keep the configured database level when database is not a legal tree
path.
+ }
+ return null;
+ }
+
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
int lastNonMiniTsFileIndex = -1;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
index 2b20f1d91ef..756d1181825 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
@@ -62,4 +63,17 @@ public class PipeStatementTsStatusVisitorTest {
StatusUtils.OK, new
TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode()))))
.getCode());
}
+
+ @Test
+ public void testDatabaseNotExistRuntimeExceptionClassification() {
+ Assert.assertEquals(
+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode(),
+ IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR
+ .process(
+ new InsertRowsStatement(),
+ new IoTDBRuntimeException(
+ "Create DataPartition failed because the database:
root.test.sg_0 is not exists",
+ TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()))
+ .getCode());
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
new file mode 100644
index 00000000000..f41c44763f9
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
+import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+public class IoTDBDataNodeReceiverTest {
+
+ @Test
+ public void
testLoadTsFileSyncStatementUsesTreeDatabaseLevelFromDatabaseName() throws
Exception {
+ final Path tsFile = Files.createTempFile("pipe-load-tree-database-level",
".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), true);
+
+ Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+ Assert.assertEquals(2, statement.getDatabaseLevel());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void
testLoadTsFileAsyncAttributesUseTreeDatabaseLevelFromDatabaseName() throws
Exception {
+ final Path tsFile =
Files.createTempFile("pipe-async-load-tree-database-level", ".tsfile");
+ try {
+ final Map<String, String> attributes =
+ IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync(
+ "root.test.sg_0", true, true, true);
+
+ Assert.assertEquals(
+ "root.test.sg_0",
attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY));
+ Assert.assertEquals("2",
attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY));
+
+ final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(tsFile.toString());
+ ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement,
true);
+ Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+ Assert.assertEquals(2, statement.getDatabaseLevel());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void
testLoadTsFileSyncStatementKeepsDefaultDatabaseLevelWhenDatabaseNameIsNull()
+ throws Exception {
+ final Path tsFile =
Files.createTempFile("pipe-load-default-database-level", ".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null,
tsFile.toString(), true);
+
+ Assert.assertNull(statement.getDatabase());
+ Assert.assertEquals(
+ IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
+ statement.getDatabaseLevel());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void testRepeatedStatementExceptionLogIsReduced() throws Exception {
+ final Path tsFile = Files.createTempFile("pipe-load-log-reducer",
".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), true);
+ final long receiverId = System.nanoTime();
+ final Exception exception = new RuntimeException("repeated receiver
exception " + receiverId);
+
+ Assert.assertTrue(
+ IoTDBDataNodeReceiver.shouldLogStatementException(receiverId,
statement, exception));
+ Assert.assertFalse(
+ IoTDBDataNodeReceiver.shouldLogStatementException(receiverId,
statement, exception));
+ Assert.assertTrue(
+ IoTDBDataNodeReceiver.shouldLogStatementException(
+ receiverId, statement, new RuntimeException("another receiver
exception")));
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+}