This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 5ef4ea1b937 Load: Convert TsFiles into Tablets when the target regions
are unavailable (#14626) (#14642)
5ef4ea1b937 is described below
commit 5ef4ea1b937c2ac4e3cbc0f622216a2e387207f9
Author: Itami Sho <[email protected]>
AuthorDate: Tue Jan 7 22:45:50 2025 +0800
Load: Convert TsFiles into Tablets when the target regions are unavailable
(#14626) (#14642)
Co-authored-by: Steve Yurong Su <[email protected]>
(cherry picked from commit eb5d1df9a4e0ccab162f68d4d7ff669faea6bc49)
---
.../plan/analyze/LoadTsFileAnalyzer.java | 11 ++--
.../plan/scheduler/load/LoadTsFileScheduler.java | 73 ++++++++++++++++++++--
.../converter/LoadTsFileDataTypeConverter.java | 18 ++++--
.../load/metrics/LoadTsFileCostMetricsSet.java | 13 +++-
4 files changed, 98 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index c2f31b9e35a..384d0e30ede 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -131,8 +131,6 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
- private final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter;
-
LoadTsFileAnalyzer(
LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context,
@@ -145,7 +143,6 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.schemaFetcher = schemaFetcher;
this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
- this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
}
public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) {
@@ -231,17 +228,17 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private void executeDataTypeConversionOnTypeMismatch(
final Analysis analysis, final VerifyMetadataTypeMismatchException e) {
+ final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+ new LoadTsFileDataTypeConverter();
final TSStatus status =
loadTsFileStatement.isConvertOnTypeMismatch()
- ?
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement)
+ ?
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement).orElse(null)
: null;
if (status == null) {
analysis.setFailStatus(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
- } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
- && status.getCode() !=
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+ } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
analysis.setFailStatus(status);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index e6d472b50dd..0a46eefa4e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -55,10 +55,12 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsF
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileDataCacheMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
@@ -126,6 +128,7 @@ public class LoadTsFileScheduler implements IScheduler {
private final LoadTsFileDispatcherImpl dispatcher;
private final DataPartitionBatchFetcher partitionFetcher;
private final List<LoadSingleTsFileNode> tsFileNodeList;
+ private final List<Integer> failedTsFileNodeIndexes;
private final PlanFragmentId fragmentId;
private final Set<TRegionReplicaSet> allReplicaSets;
private final boolean isGeneratedByPipe;
@@ -141,6 +144,7 @@ public class LoadTsFileScheduler implements IScheduler {
this.queryContext = queryContext;
this.stateMachine = stateMachine;
this.tsFileNodeList = new ArrayList<>();
+ this.failedTsFileNodeIndexes = new ArrayList<>();
this.fragmentId =
distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
this.dispatcher = new
LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
@@ -181,8 +185,8 @@ public class LoadTsFileScheduler implements IScheduler {
} else if (!node.needDecodeTsFile(
slotList ->
partitionFetcher.queryDataPartition(
- slotList,
- queryContext.getSession().getUserName()))) { // do not
decode, load locally
+ slotList, queryContext.getSession().getUserName()))) {
+ // do not decode, load locally
final long startTime = System.nanoTime();
try {
isLoadSingleTsFileSuccess = loadLocally(node);
@@ -190,7 +194,8 @@ public class LoadTsFileScheduler implements IScheduler {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() -
startTime);
}
- } else { // need decode, load locally or remotely, use two phases
method
+ } else {
+ // need decode, load locally or remotely, use two phases method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();
@@ -229,6 +234,7 @@ public class LoadTsFileScheduler implements IScheduler {
tsFileNodeListSize);
} else {
isLoadSuccess = false;
+ failedTsFileNodeIndexes.add(i);
LOGGER.warn(
"Can not Load TsFile {}, load process [{}/{}]",
filePath,
@@ -237,6 +243,7 @@ public class LoadTsFileScheduler implements IScheduler {
}
} catch (Exception e) {
isLoadSuccess = false;
+ failedTsFileNodeIndexes.add(i);
stateMachine.transitionToFailed(e);
LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath,
e);
} finally {
@@ -247,8 +254,18 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
}
+
if (isLoadSuccess) {
stateMachine.transitionToFinished();
+ } else {
+ final long startTime = System.nanoTime();
+ try {
+ // if failed to load some TsFiles, then try to convert the TsFiles
to Tablets
+ convertFailedTsFilesToTabletsAndRetry();
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.CAST_TABLETS, System.nanoTime() -
startTime);
+ }
}
} finally {
LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
@@ -341,7 +358,7 @@ public class LoadTsFileScheduler implements IScheduler {
for (TSStatus status : result.getFailureStatus().getSubStatus()) {
LOGGER.warn(
"Sub status code {}. Sub status message {}.",
- TSStatusCode.representOf(status.getCode()).name(),
+ TSStatusCode.representOf(status.getCode()).toString(),
status.getMessage());
}
}
@@ -506,6 +523,54 @@ public class LoadTsFileScheduler implements IScheduler {
return true;
}
+ private void convertFailedTsFilesToTabletsAndRetry() {
+ final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+ new LoadTsFileDataTypeConverter();
+
+ for (final int failedLoadTsFileIndex : failedTsFileNodeIndexes) {
+ final LoadSingleTsFileNode failedNode =
tsFileNodeList.get(failedLoadTsFileIndex);
+ final String filePath = failedNode.getTsFileResource().getTsFilePath();
+
+ try {
+ final TSStatus status =
+ loadTsFileDataTypeConverter
+ .convertForTreeModel(new LoadTsFileStatement(filePath))
+ .orElse(null);
+
+ if (loadTsFileDataTypeConverter.isSuccessful(status)) {
+ failedTsFileNodeIndexes.remove(failedLoadTsFileIndex);
+ LOGGER.info(
+ "Load: Successfully converted TsFile {} into tablets and
inserted.",
+ failedNode.getTsFileResource().getTsFilePath());
+ } else {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from TsFile {}. Status: {}",
+ failedNode.getTsFileResource().getTsFilePath(),
+ status);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from TsFile {}. Exception: {}",
+ failedNode.getTsFileResource().getTsFilePath(),
+ e.getMessage(),
+ e);
+ }
+ }
+
+ // If all failed TsFiles are converted into tablets and inserted,
+ // we can consider the load process as successful.
+ if (failedTsFileNodeIndexes.isEmpty()) {
+ stateMachine.transitionToFinished();
+ } else {
+ stateMachine.transitionToFailed(
+ new LoadFileException(
+ "Failed to load some TsFiles by converting them into tablets.
Failed TsFiles: "
+ + failedTsFileNodeIndexes.stream()
+ .map(i ->
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
+ .collect(Collectors.joining(", "))));
+ }
+ }
+
@Override
public void stop(Throwable t) {
// Do nothing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index b3824140483..05fe606931e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
+
public class LoadTsFileDataTypeConverter {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class);
@@ -58,15 +60,21 @@ public class LoadTsFileDataTypeConverter {
public static final LoadConvertedInsertTabletStatementExceptionVisitor
STATEMENT_EXCEPTION_VISITOR = new
LoadConvertedInsertTabletStatementExceptionVisitor();
- public TSStatus convertForTreeModel(LoadTsFileStatement
loadTsFileTreeStatement) {
+ public Optional<TSStatus> convertForTreeModel(LoadTsFileStatement
loadTsFileTreeStatement) {
try {
- return loadTsFileTreeStatement
- .accept(treeStatementDataTypeConvertExecutionVisitor, null)
- .orElse(null);
+ return
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor,
null);
} catch (Exception e) {
LOGGER.warn(
"Failed to convert data types for tree model statement {}.",
loadTsFileTreeStatement, e);
- return new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ return Optional.of(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
}
}
+
+ public boolean isSuccessful(final TSStatus status) {
+ return status != null
+ && (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || status.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ || status.getCode() ==
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index fd0f398644d..5947f101b06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -40,6 +40,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
public static final String FIRST_PHASE = "first_phase";
public static final String SECOND_PHASE = "second_phase";
public static final String LOAD_LOCALLY = "load_locally";
+ public static final String CAST_TABLETS = "cast_tablets";
private LoadTsFileCostMetricsSet() {
// empty constructor
@@ -49,6 +50,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer castTabletsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
@@ -66,6 +68,9 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
case LOAD_LOCALLY:
loadLocallyTimer.updateNanos(costTimeInNanos);
break;
+ case CAST_TABLETS:
+ castTabletsTimer.updateNanos(costTimeInNanos);
+ break;
default:
throw new UnsupportedOperationException("Unsupported stage: " + stage);
}
@@ -98,6 +103,12 @@ public class LoadTsFileCostMetricsSet implements IMetricSet
{
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
LOAD_LOCALLY);
+ castTabletsTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CAST_TABLETS);
diskIOCounter =
metricService.getOrCreateCounter(
@@ -109,7 +120,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet
{
@Override
public void unbindFrom(AbstractMetricService metricService) {
- Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY)
+ Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY,
CAST_TABLETS)
.forEach(
stage ->
metricService.remove(