This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 99ada4fbe40 Load: Convert to tablets when node is read-only (#15693)
99ada4fbe40 is described below
commit 99ada4fbe40966951ab9041c50372b8c68e90fd1
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Jun 16 16:09:25 2025 +0800
Load: Convert to tablets when node is read-only (#15693)
---
.../plan/analyze/load/LoadTsFileAnalyzer.java | 9 +++-----
.../plan/scheduler/load/LoadTsFileScheduler.java | 27 +++++++---------------
2 files changed, 11 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 1fa4e313b1b..c1d8c3fe979 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
-import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -259,16 +258,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
analysis.setFailStatus(
RpcUtils.getStatus(
TSStatusCode.LOAD_FILE_ERROR,
- "TSFile encryption is enabled, and the Load TSFile function is
disabled"));
+ "TsFile encryption is enabled, and the Load TsFile function is
disabled"));
return false;
}
// check if the system is read only
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(
- RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY,
LoadReadOnlyException.MESSAGE));
- return false;
+ LOGGER.info(
+ "LoadTsFileAnalyzer: Current datanode is read only, will try to
convert to tablets and insert later.");
}
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 6b5002e1891..d00ae8d67e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -270,7 +270,6 @@ public class LoadTsFileScheduler implements IScheduler {
} catch (Exception e) {
isLoadSuccess = false;
failedTsFileNodeIndexes.add(i);
- stateMachine.transitionToFailed(e);
LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath,
e);
} finally {
if (shouldRemoveFileFromLoadingSet) {
@@ -319,11 +318,9 @@ public class LoadTsFileScheduler implements IScheduler {
node.getTsFileResource().getTsFile(),
tsFileDataManager::addOrSendTsFileData)
.splitTsFileByDataPartition();
if (!tsFileDataManager.sendAllTsFileData()) {
- stateMachine.transitionToFailed(new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
return false;
}
} catch (IllegalStateException e) {
- stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format(
"Dispatch TsFileData error when parsing TsFile %s.",
@@ -331,7 +328,6 @@ public class LoadTsFileScheduler implements IScheduler {
e);
return false;
} catch (Exception e) {
- stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format("Parse or send TsFile %s error.",
node.getTsFileResource().getTsFile()), e);
return false;
@@ -361,7 +357,6 @@ public class LoadTsFileScheduler implements IScheduler {
dispatchResultFuture.get(
CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds(),
TimeUnit.SECONDS);
if (!result.isSuccessful()) {
- // TODO: retry.
LOGGER.warn(
"Dispatch one piece to ReplicaSet {} error. Result status code {}.
"
+ "Result status message {}. Dispatch piece node error:%n{}",
@@ -381,7 +376,6 @@ public class LoadTsFileScheduler implements IScheduler {
status.setMessage(
String.format("Load %s piece error in 1st phase. Because ",
pieceNode.getTsFile())
+ status.getMessage());
- stateMachine.transitionToFailed(status); // TODO: record more status
return false;
}
} catch (InterruptedException | ExecutionException | CancellationException
e) {
@@ -389,13 +383,11 @@ public class LoadTsFileScheduler implements IScheduler {
Thread.currentThread().interrupt();
}
LOGGER.warn("Interrupt or Execution error.", e);
- stateMachine.transitionToFailed(e);
return false;
} catch (TimeoutException e) {
dispatchResultFuture.cancel(true);
LOGGER.warn(
String.format("Wait for loading %s time out.",
LoadTsFilePieceNode.class.getName()), e);
- stateMachine.transitionToFailed(e);
return false;
}
return true;
@@ -436,7 +428,6 @@ public class LoadTsFileScheduler implements IScheduler {
FragInstanceDispatchResult result = dispatchResultFuture.get();
if (!result.isSuccessful()) {
- // TODO: retry.
LOGGER.warn(
"Dispatch load command {} of TsFile {} error to replicaSets {}
error. "
+ "Result status code {}. Result status message {}.",
@@ -450,7 +441,6 @@ public class LoadTsFileScheduler implements IScheduler {
String.format(
"Load %s error in second phase. Because %s, first phase is %s",
tsFile, status.getMessage(), isFirstPhaseSuccess ? "success" :
"failed"));
- stateMachine.transitionToFailed(status);
return false;
}
} catch (InterruptedException | ExecutionException e) {
@@ -458,11 +448,9 @@ public class LoadTsFileScheduler implements IScheduler {
Thread.currentThread().interrupt();
}
LOGGER.warn("Interrupt or Execution error.", e);
- stateMachine.transitionToFailed(e);
return false;
} catch (Exception e) {
LOGGER.warn("Exception occurred during second phase of loading TsFile
{}.", tsFile, e);
- stateMachine.transitionToFailed(e);
return false;
}
return true;
@@ -523,7 +511,6 @@ public class LoadTsFileScheduler implements IScheduler {
node.getTsFileResource().getTsFile(),
TSStatusCode.representOf(e.getFailureStatus().getCode()).name(),
e.getFailureStatus().getMessage()));
- stateMachine.transitionToFailed(e.getFailureStatus());
return false;
}
@@ -627,14 +614,16 @@ public class LoadTsFileScheduler implements IScheduler {
// If all failed TsFiles are converted into tablets and inserted,
// we can consider the load process as successful.
if (failedTsFileNodeIndexes.isEmpty()) {
+ LOGGER.info("Load: all failed TsFiles are converted to tablets and
inserted.");
stateMachine.transitionToFinished();
} else {
- stateMachine.transitionToFailed(
- new LoadFileException(
- "Failed to load some TsFiles by converting them into tablets.
Failed TsFiles: "
- + failedTsFileNodeIndexes.stream()
- .map(i ->
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
- .collect(Collectors.joining(", "))));
+ final String errorMsg =
+ "Load: failed to load some TsFiles by converting them into tablets.
Failed TsFiles: "
+ + failedTsFileNodeIndexes.stream()
+ .map(i ->
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
+ .collect(Collectors.joining(", "));
+ LOGGER.warn(errorMsg);
+ stateMachine.transitionToFailed(new LoadFileException(errorMsg));
}
}