This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 251125ae7f4 Fix pipe tsfile receiver database handling (#17815)
(#17836)
251125ae7f4 is described below
commit 251125ae7f4e7618b82f6808e9fe634e939bce02
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 4 10:32:53 2026 +0800
Fix pipe tsfile receiver database handling (#17815) (#17836)
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +
.../protocol/thrift/IoTDBDataNodeReceiver.java | 88 ++++++++++++-----
.../visitor/PipeStatementExceptionVisitor.java | 8 ++
.../request/PipeTransferTsFileSealWithModReq.java | 68 ++++++++++++-
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 11 ++-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 6 +-
.../async/handler/PipeTransferTsFileHandler.java | 17 +++-
.../thrift/sync/IoTDBDataRegionSyncSink.java | 20 +++-
.../plan/statement/crud/LoadTsFileStatement.java | 39 ++++++++
.../load/active/ActiveLoadPathHelper.java | 30 ++++++
.../load/config/LoadTsFileConfigurator.java | 7 ++
.../receiver/PipeStatementTsStatusVisitorTest.java | 14 +++
.../protocol/thrift/IoTDBDataNodeReceiverTest.java | 110 +++++++++++++++++++++
13 files changed, 385 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 8ffbc9f2f9b..c9cd2d44af0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -265,6 +265,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
return tsFile;
}
+ public String getDatabaseName() {
+ return Objects.isNull(resource) ? null : resource.getDatabaseName();
+ }
+
public File getModFile() {
return modFile;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 58d4c29eddc..1ec4d6d53b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -52,6 +52,7 @@ import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisito
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
@@ -459,29 +460,31 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final
String fileAbsolutePath)
throws IOException {
return isUsingAsyncLoadTsFileStrategy.get()
- ? loadTsFileAsync(Collections.singletonList(fileAbsolutePath))
- : loadTsFileSync(fileAbsolutePath);
+ ? loadTsFileAsync(null, Collections.singletonList(fileAbsolutePath))
+ : loadTsFileSync(null, fileAbsolutePath);
}
@Override
protected TSStatus loadFileV2(
final PipeTransferFileSealReqV2 req, final List<String>
fileAbsolutePaths)
throws IOException, IllegalPathException {
- return req instanceof PipeTransferTsFileSealWithModReq
- // TsFile's absolute path will be the second element
- ? (isUsingAsyncLoadTsFileStrategy.get()
- ? loadTsFileAsync(fileAbsolutePaths)
- : loadTsFileSync(fileAbsolutePaths.get(1)))
- : loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
+ if (req instanceof PipeTransferTsFileSealWithModReq) {
+ final String dataBaseName =
+ ((PipeTransferTsFileSealWithModReq)
req).getDatabaseNameByTsFileName();
+ return isUsingAsyncLoadTsFileStrategy.get()
+ ? loadTsFileAsync(dataBaseName, fileAbsolutePaths)
+ : loadTsFileSync(dataBaseName,
fileAbsolutePaths.get(req.getFileNames().size() - 1));
+ }
+ return loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
}
- private TSStatus loadTsFileAsync(final List<String> absolutePaths) throws
IOException {
+ private TSStatus loadTsFileAsync(final String dataBaseName, final
List<String> absolutePaths)
+ throws IOException {
final Map<String, String> loadAttributes =
- ActiveLoadPathHelper.buildAttributes(
- null,
+ buildLoadTsFileAttributesForAsync(
+ dataBaseName,
shouldConvertDataTypeOnTypeMismatch,
validateTsFile.get(),
- null,
shouldMarkAsPipeRequest.get());
if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths,
true)) {
throw new PipeException("Load active listening pipe dir is not set.");
@@ -489,15 +492,38 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- private TSStatus loadTsFileSync(final String fileAbsolutePath) throws
FileNotFoundException {
+ static Map<String, String> buildLoadTsFileAttributesForAsync(
+ final String dataBaseName,
+ final boolean shouldConvertDataTypeOnTypeMismatch,
+ final boolean validateTsFile,
+ final boolean shouldMarkAsPipeRequest) {
+ return ActiveLoadPathHelper.buildAttributes(
+ dataBaseName,
+ LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName),
+ shouldConvertDataTypeOnTypeMismatch,
+ validateTsFile,
+ null,
+ shouldMarkAsPipeRequest);
+ }
+
+ private TSStatus loadTsFileSync(final String dataBaseName, final String
fileAbsolutePath)
+ throws FileNotFoundException {
+ return executeStatementAndClassifyExceptions(
+ buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath,
validateTsFile.get()));
+ }
+
+ static LoadTsFileStatement buildLoadTsFileStatementForSync(
+ final String dataBaseName, final String fileAbsolutePath, final boolean
validateTsFile)
+ throws FileNotFoundException {
final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
- statement.setVerifySchema(validateTsFile.get());
+ statement.setVerifySchema(validateTsFile);
statement.setAutoCreateDatabase(
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
-
- return executeStatementAndClassifyExceptions(statement);
+ statement.setDatabase(dataBaseName);
+ statement.updateDatabaseLevelByTreeDatabase();
+ return statement;
}
private TSStatus loadSchemaSnapShot(
@@ -704,12 +730,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return STATEMENT_STATUS_VISITOR.process(statement, result);
}
} catch (final Exception e) {
- PipeLogger.log(
- LOGGER::warn,
- e,
- "Receiver id = %s: Exception encountered while executing statement
%s: ",
- receiverId.get(),
- statement.getPipeLoggingString());
+ logStatementExceptionIfNecessary(statement, e);
return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
} finally {
if (Objects.nonNull(allocatedMemoryBlock)) {
@@ -719,6 +740,29 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
}
+ private void logStatementExceptionIfNecessary(final Statement statement,
final Exception e) {
+ if (shouldLogStatementException(receiverId.get(), statement, e)) {
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Receiver id = %s: Exception encountered while executing statement
%s: ",
+ receiverId.get(),
+ Objects.isNull(statement) ? null : statement.getPipeLoggingString());
+ }
+ }
+
+ static boolean shouldLogStatementException(
+ final long receiverId, final Statement statement, final Exception e) {
+ // Use the reducer cache as a gate. The actual stack trace is logged only
when it passes.
+ return PipePeriodicalLogReducer.log(
+ message -> {},
+ "Receiver id = %s, statement = %s, exception = %s, message = %s",
+ receiverId,
+ Objects.isNull(statement) ? null : statement.getPipeLoggingString(),
+ e.getClass().getName(),
+ e.getMessage());
+ }
+
private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement
statement) {
if (statement == null) {
return RpcUtils.getStatus(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 098c983977a..7b8246e3dab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -48,6 +49,13 @@ import java.util.Objects;
public class PipeStatementExceptionVisitor extends StatementVisitor<TSStatus,
Exception> {
@Override
public TSStatus visitNode(final StatementNode node, final Exception context)
{
+ if (context instanceof IoTDBRuntimeException
+ && ((IoTDBRuntimeException) context).getErrorCode()
+ == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
.setMessage(context.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java
index 28959a1a090..7d0aa99cb13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTsFileSealWithModReq.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
public class PipeTransferTsFileSealWithModReq extends
PipeTransferFileSealReqV2 {
@@ -38,17 +40,59 @@ public class PipeTransferTsFileSealWithModReq extends
PipeTransferFileSealReqV2
return PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD;
}
+ private static final String DATABASE_NAME_KEY_PREFIX = "DATABASE_NAME_";
+
+ public String getDatabaseNameByTsFileName() {
+ return getParameters() == null
+ ? null
+ : getParameters()
+ .get(
+
generateDatabaseNameWithFileNameKey(getFileNames().get(getFileNames().size() -
1)));
+ }
+
+ private static String generateDatabaseNameWithFileNameKey(final String
fileName) {
+ return DATABASE_NAME_KEY_PREFIX + fileName;
+ }
+
+ private static Map<String, String> generateDatabaseNameParameter(
+ final String tsFileName, final String dataBaseName) {
+ return dataBaseName == null
+ ? new HashMap<>()
+ :
Collections.singletonMap(generateDatabaseNameWithFileNameKey(tsFileName),
dataBaseName);
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
String modFileName, long modFileLength, String tsFileName, long
tsFileLength)
throws IOException {
+ return toTPipeTransferReq(modFileName, modFileLength, tsFileName,
tsFileLength, null);
+ }
+
+ public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
+ final String modFileName,
+ final long modFileLength,
+ final String tsFileName,
+ final long tsFileLength,
+ final String dataBaseName)
+ throws IOException {
return (PipeTransferTsFileSealWithModReq)
new PipeTransferTsFileSealWithModReq()
.convertToTPipeTransferReq(
Arrays.asList(modFileName, tsFileName),
Arrays.asList(modFileLength, tsFileLength),
- new HashMap<>());
+ generateDatabaseNameParameter(tsFileName, dataBaseName));
+ }
+
+ public static PipeTransferTsFileSealWithModReq toTPipeTransferReq(
+ final String tsFileName, final long tsFileLength, final String
dataBaseName)
+ throws IOException {
+ return (PipeTransferTsFileSealWithModReq)
+ new PipeTransferTsFileSealWithModReq()
+ .convertToTPipeTransferReq(
+ Collections.singletonList(tsFileName),
+ Collections.singletonList(tsFileLength),
+ generateDatabaseNameParameter(tsFileName, dataBaseName));
}
public static PipeTransferTsFileSealWithModReq
fromTPipeTransferReq(TPipeTransferReq req) {
@@ -61,11 +105,31 @@ public class PipeTransferTsFileSealWithModReq extends
PipeTransferFileSealReqV2
public static byte[] toTPipeTransferBytes(
String modFileName, long modFileLength, String tsFileName, long
tsFileLength)
throws IOException {
+ return toTPipeTransferBytes(modFileName, modFileLength, tsFileName,
tsFileLength, null);
+ }
+
+ public static byte[] toTPipeTransferBytes(
+ final String modFileName,
+ final long modFileLength,
+ final String tsFileName,
+ final long tsFileLength,
+ final String dataBaseName)
+ throws IOException {
return new PipeTransferTsFileSealWithModReq()
.convertToTPipeTransferSnapshotSealBytes(
Arrays.asList(modFileName, tsFileName),
Arrays.asList(modFileLength, tsFileLength),
- new HashMap<>());
+ generateDatabaseNameParameter(tsFileName, dataBaseName));
+ }
+
+ public static byte[] toTPipeTransferBytes(
+ final String tsFileName, final long tsFileLength, final String
dataBaseName)
+ throws IOException {
+ return new PipeTransferTsFileSealWithModReq()
+ .convertToTPipeTransferSnapshotSealBytes(
+ Collections.singletonList(tsFileName),
+ Collections.singletonList(tsFileLength),
+ generateDatabaseNameParameter(tsFileName, dataBaseName));
}
/////////////////////////////// Object ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 81e745dc6a7..b2af96a37c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -255,7 +255,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
final Map<Pair<String, Long>, Double> pipe2WeightMap =
batchToTransfer.deepCopyPipe2WeightMap();
for (final File tsFile : sealedFiles) {
- doTransfer(pipe2WeightMap, socket, tsFile, null, tsFile.getName());
+ doTransfer(pipe2WeightMap, socket, tsFile, null, null, tsFile.getName());
try {
RetryUtils.retryOnException(
() -> {
@@ -379,6 +379,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver
? pipeTsFileInsertionEvent.getModFile()
: null,
+ pipeTsFileInsertionEvent.getDatabaseName(),
pipeTsFileInsertionEvent.toString());
}
@@ -387,6 +388,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
final AirGapSocket socket,
final File tsFile,
final File modFile,
+ final String dataBaseName,
final String receiverStatusContext)
throws PipeException, IOException {
final String errorMessage = String.format("Seal file %s error. Socket
%s.", tsFile, socket);
@@ -397,7 +399,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
if (!sendWeighted(
socket,
PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
- modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length()),
+ modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length(), dataBaseName),
pipe2WeightMap)) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
@@ -411,7 +413,10 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
transferFilePieces(pipe2WeightMap, tsFile, socket, false);
if (!sendWeighted(
socket,
- PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(),
tsFile.length()),
+ dataBaseName == null
+ ?
PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(),
tsFile.length())
+ : PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
+ tsFile.getName(), tsFile.length(), dataBaseName),
pipe2WeightMap)) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 9d0c1563c78..b61cb4543c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -255,7 +255,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
eventsHadBeenAddedToRetryQueue,
sealedFile,
null,
- false));
+ false,
+ null));
}
} catch (final Exception e) {
PipeLogger.log(LOGGER::warn, e, "Failed to transfer tsfile batch
(%s).", sealedFiles);
@@ -400,7 +401,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
pipeTsFileInsertionEvent.getTsFile(),
pipeTsFileInsertionEvent.getModFile(),
pipeTsFileInsertionEvent.isWithMod()
- && clientManager.supportModsIfIsDataNodeReceiver());
+ && clientManager.supportModsIfIsDataNodeReceiver(),
+ pipeTsFileInsertionEvent.getDatabaseName());
transfer(pipeTransferTsFileHandler);
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 8d9648f5292..d7515141dae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -78,6 +78,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
private File currentFile;
private final boolean transferMod;
+ private final String dataBaseName;
private final int readFileBufferSize;
private PipeTsFileMemoryBlock memoryBlock;
@@ -98,7 +99,8 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
final AtomicBoolean eventsHadBeenAddedToRetryQueue,
final File tsFile,
final File modFile,
- final boolean transferMod)
+ final boolean transferMod,
+ final String dataBaseName)
throws InterruptedException {
super(connector);
@@ -111,6 +113,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
this.tsFile = tsFile;
this.modFile = modFile;
this.transferMod = transferMod;
+ this.dataBaseName = dataBaseName;
currentFile = transferMod ? modFile : tsFile;
// NOTE: Waiting for resource enough for slicing here may cause deadlock!
@@ -191,8 +194,16 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
final TPipeTransferReq uncompressedReq =
transferMod
? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
- modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())
- :
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
+ modFile.getName(),
+ modFile.length(),
+ tsFile.getName(),
+ tsFile.length(),
+ dataBaseName)
+ : dataBaseName == null
+ ? PipeTransferTsFileSealReq.toTPipeTransferReq(
+ tsFile.getName(), tsFile.length())
+ : PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+ tsFile.getName(), tsFile.length(), dataBaseName);
final TPipeTransferReq req = sink.compressIfNeeded(uncompressedReq);
pipeName2WeightMap.forEach(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index ef3d59f0d2a..1bb0c383ff8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -275,7 +275,7 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
final Map<Pair<String, Long>, Double> pipe2WeightMap =
batchToTransfer.deepCopyPipe2WeightMap();
for (final File tsFile : sealedFiles) {
- doTransfer(pipe2WeightMap, tsFile, null);
+ doTransfer(pipe2WeightMap, tsFile, null, null);
try {
RetryUtils.retryOnException(
() -> {
@@ -428,7 +428,8 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
pipeTsFileInsertionEvent.getCreationTime()),
1.0),
pipeTsFileInsertionEvent.getTsFile(),
- pipeTsFileInsertionEvent.isWithMod() ?
pipeTsFileInsertionEvent.getModFile() : null);
+ pipeTsFileInsertionEvent.isWithMod() ?
pipeTsFileInsertionEvent.getModFile() : null,
+ pipeTsFileInsertionEvent.getDatabaseName());
} finally {
pipeTsFileInsertionEvent.decreaseReferenceCount(
IoTDBDataRegionSyncSink.class.getName(), false);
@@ -438,7 +439,8 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
private void doTransfer(
final Map<Pair<String, Long>, Double> pipeName2WeightMap,
final File tsFile,
- final File modFile)
+ final File modFile,
+ final String dataBaseName)
throws PipeException, IOException {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
@@ -454,7 +456,11 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
- modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length()));
+ modFile.getName(),
+ modFile.length(),
+ tsFile.getName(),
+ tsFile.length(),
+ dataBaseName));
pipeName2WeightMap.forEach(
(pipePair, weight) ->
@@ -479,7 +485,11 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
try {
final TPipeTransferReq req =
compressIfNeeded(
- PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(),
tsFile.length()));
+ dataBaseName == null
+ ? PipeTransferTsFileSealReq.toTPipeTransferReq(
+ tsFile.getName(), tsFile.length())
+ : PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+ tsFile.getName(), tsFile.length(), dataBaseName));
pipeName2WeightMap.forEach(
(pipePair, weight) ->
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 6e74ceed206..404b957c786 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -20,7 +20,9 @@
package org.apache.iotdb.db.queryengine.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -42,10 +44,13 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+
public class LoadTsFileStatement extends Statement {
private final File file;
private int databaseLevel;
+ private String database;
private boolean verifySchema = true;
private boolean deleteAfterLoad = false;
private boolean convertOnTypeMismatch = true;
@@ -201,6 +206,14 @@ public class LoadTsFileStatement extends Statement {
return databaseLevel;
}
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
public void setVerifySchema(boolean verifySchema) {
this.verifySchema = verifySchema;
}
@@ -281,6 +294,7 @@ public class LoadTsFileStatement extends Statement {
private void initAttributes(final Map<String, String> loadAttributes) {
this.databaseLevel =
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
+ this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
@@ -293,6 +307,28 @@ public class LoadTsFileStatement extends Statement {
}
}
+ public void updateDatabaseLevelByTreeDatabase() {
+ final Integer databaseLevel = getDatabaseLevelByTreeDatabase(database);
+ if (databaseLevel != null) {
+ this.databaseLevel = databaseLevel;
+ }
+ }
+
+ public static Integer getDatabaseLevelByTreeDatabase(final String database) {
+ if (database == null) {
+ return null;
+ }
+ try {
+ final String[] nodes = PathUtils.splitPathToDetachedNodes(database);
+ if (nodes.length > 1 && PATH_ROOT.equals(nodes[0])) {
+ return nodes.length - 1;
+ }
+ } catch (final IllegalPathException ignored) {
+ // Keep the configured database level when database is not a legal tree
path.
+ }
+ return null;
+ }
+
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
int lastNonMiniTsFileIndex = -1;
@@ -352,6 +388,7 @@ public class LoadTsFileStatement extends Statement {
final LoadTsFileStatement statement = new LoadTsFileStatement();
statement.databaseLevel = this.databaseLevel;
+ statement.database = this.database;
statement.verifySchema = this.verifySchema;
statement.deleteAfterLoad = this.deleteAfterLoad;
statement.convertOnTypeMismatch = this.convertOnTypeMismatch;
@@ -395,6 +432,8 @@ public class LoadTsFileStatement extends Statement {
+ deleteAfterLoad
+ ", database-level="
+ databaseLevel
+ + ", database="
+ + database
+ ", verify-schema="
+ verifySchema
+ ", convert-on-type-mismatch="
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
index 965f2941dc6..2503b822b9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPathHelper.java
@@ -46,6 +46,7 @@ public final class ActiveLoadPathHelper {
private static final List<String> KEY_ORDER =
Collections.unmodifiableList(
Arrays.asList(
+ LoadTsFileConfigurator.DATABASE_NAME_KEY,
LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY,
LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY,
@@ -62,8 +63,28 @@ public final class ActiveLoadPathHelper {
final Boolean verify,
final Long tabletConversionThresholdBytes,
final Boolean pipeGenerated) {
+ return buildAttributes(
+ null,
+ databaseLevel,
+ convertOnTypeMismatch,
+ verify,
+ tabletConversionThresholdBytes,
+ pipeGenerated);
+ }
+
+ public static Map<String, String> buildAttributes(
+ final String databaseName,
+ final Integer databaseLevel,
+ final Boolean convertOnTypeMismatch,
+ final Boolean verify,
+ final Long tabletConversionThresholdBytes,
+ final Boolean pipeGenerated) {
final Map<String, String> attributes = new LinkedHashMap<>();
+ if (Objects.nonNull(databaseName) && !databaseName.isEmpty()) {
+ attributes.put(LoadTsFileConfigurator.DATABASE_NAME_KEY, databaseName);
+ }
+
if (Objects.nonNull(databaseLevel)) {
attributes.put(LoadTsFileConfigurator.DATABASE_LEVEL_KEY,
databaseLevel.toString());
}
@@ -149,6 +170,10 @@ public final class ActiveLoadPathHelper {
final LoadTsFileStatement statement,
final boolean defaultVerify) {
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY))
+ .filter(name -> !name.isEmpty())
+ .ifPresent(statement::setDatabase);
+
Optional.ofNullable(attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY))
.ifPresent(
level -> {
@@ -216,6 +241,11 @@ public final class ActiveLoadPathHelper {
private static void validateAttributeValue(final String key, final String
value) {
switch (key) {
+ case LoadTsFileConfigurator.DATABASE_NAME_KEY:
+ if (value == null || value.isEmpty()) {
+ throw new SemanticException("Database name must not be empty.");
+ }
+ break;
case LoadTsFileConfigurator.DATABASE_LEVEL_KEY:
LoadTsFileConfigurator.validateDatabaseLevelParam(value);
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 510d47b0b23..8b689c6fb22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -40,6 +40,7 @@ public class LoadTsFileConfigurator {
case ON_SUCCESS_KEY:
validateOnSuccessParam(value);
break;
+ case DATABASE_NAME_KEY:
case TABLET_CONVERSION_THRESHOLD_KEY:
break;
case CONVERT_ON_TYPE_MISMATCH_KEY:
@@ -87,6 +88,12 @@ public class LoadTsFileConfigurator {
DATABASE_LEVEL_KEY, String.valueOf(DATABASE_LEVEL_DEFAULT_VALUE)));
}
+ public static final String DATABASE_NAME_KEY = "database-name";
+
+ public static String parseDatabaseName(final Map<String, String>
loadAttributes) {
+ return loadAttributes.get(DATABASE_NAME_KEY);
+ }
+
public static final String ON_SUCCESS_KEY = "on-success";
public static final String ON_SUCCESS_DELETE_VALUE = "delete";
public static final String ON_SUCCESS_NONE_VALUE = "none";
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
index 2b20f1d91ef..756d1181825 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
@@ -62,4 +63,17 @@ public class PipeStatementTsStatusVisitorTest {
StatusUtils.OK, new
TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode()))))
.getCode());
}
+
+ @Test
+ public void testDatabaseNotExistRuntimeExceptionClassification() {
+ Assert.assertEquals(
+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode(),
+ IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR
+ .process(
+ new InsertRowsStatement(),
+ new IoTDBRuntimeException(
+ "Create DataPartition failed because the database:
root.test.sg_0 is not exists",
+ TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()))
+ .getCode());
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
new file mode 100644
index 00000000000..f41c44763f9
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
+import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+public class IoTDBDataNodeReceiverTest {
+
+ @Test
+ public void
testLoadTsFileSyncStatementUsesTreeDatabaseLevelFromDatabaseName() throws
Exception {
+ final Path tsFile = Files.createTempFile("pipe-load-tree-database-level",
".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), true);
+
+ Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+ Assert.assertEquals(2, statement.getDatabaseLevel());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void
testLoadTsFileAsyncAttributesUseTreeDatabaseLevelFromDatabaseName() throws
Exception {
+ final Path tsFile =
Files.createTempFile("pipe-async-load-tree-database-level", ".tsfile");
+ try {
+ final Map<String, String> attributes =
+ IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync(
+ "root.test.sg_0", true, true, true);
+
+ Assert.assertEquals(
+ "root.test.sg_0",
attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY));
+ Assert.assertEquals("2",
attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY));
+
+ final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(tsFile.toString());
+ ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement,
true);
+ Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+ Assert.assertEquals(2, statement.getDatabaseLevel());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void
testLoadTsFileSyncStatementKeepsDefaultDatabaseLevelWhenDatabaseNameIsNull()
+ throws Exception {
+ final Path tsFile =
Files.createTempFile("pipe-load-default-database-level", ".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null,
tsFile.toString(), true);
+
+ Assert.assertNull(statement.getDatabase());
+ Assert.assertEquals(
+ IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
+ statement.getDatabaseLevel());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void testRepeatedStatementExceptionLogIsReduced() throws Exception {
+ final Path tsFile = Files.createTempFile("pipe-load-log-reducer",
".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), true);
+ final long receiverId = System.nanoTime();
+ final Exception exception = new RuntimeException("repeated receiver
exception " + receiverId);
+
+ Assert.assertTrue(
+ IoTDBDataNodeReceiver.shouldLogStatementException(receiverId,
statement, exception));
+ Assert.assertFalse(
+ IoTDBDataNodeReceiver.shouldLogStatementException(receiverId,
statement, exception));
+ Assert.assertTrue(
+ IoTDBDataNodeReceiver.shouldLogStatementException(
+ receiverId, statement, new RuntimeException("another receiver
exception")));
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+}