This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new eb5d1df9a4e Load: Convert TsFiles into Tablets when the target regions
are unavailable (#14626)
eb5d1df9a4e is described below
commit eb5d1df9a4e0ccab162f68d4d7ff669faea6bc49
Author: Itami Sho <[email protected]>
AuthorDate: Mon Jan 6 15:10:02 2025 +0800
Load: Convert TsFiles into Tablets when the target regions are unavailable
(#14626)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../plan/analyze/load/LoadTsFileAnalyzer.java | 19 ++---
.../plan/relational/sql/ast/LoadTsFile.java | 5 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 80 ++++++++++++++++++++--
.../converter/LoadTsFileDataTypeConverter.java | 29 +++++---
.../load/metrics/LoadTsFileCostMetricsSet.java | 13 +++-
5 files changed, 119 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 71e985be113..2679b6c196a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -82,8 +82,6 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
final IPartitionFetcher partitionFetcher =
ClusterPartitionFetcher.getInstance();
final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
- protected final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter;
-
LoadTsFileAnalyzer(LoadTsFileStatement loadTsFileStatement, MPPQueryContext
context) {
this.loadTsFileTreeStatement = loadTsFileStatement;
this.tsFiles = loadTsFileStatement.getTsFiles();
@@ -94,7 +92,6 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
this.database = loadTsFileStatement.getDatabase();
- this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
this.loadTsFileTableStatement = null;
this.isTableModelStatement = false;
@@ -111,7 +108,6 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
this.isAutoCreateDatabase =
loadTsFileTableStatement.isAutoCreateDatabase();
this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
this.database = loadTsFileTableStatement.getDatabase();
- this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
this.loadTsFileTreeStatement = null;
this.isTableModelStatement = true;
@@ -178,19 +174,24 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
protected void executeDataTypeConversionOnTypeMismatch(
final IAnalysis analysis, final VerifyMetadataTypeMismatchException e) {
+ final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+ new LoadTsFileDataTypeConverter();
+
final TSStatus status =
isConvertOnTypeMismatch
? (isTableModelStatement
- ?
loadTsFileDataTypeConverter.convertForTableModel(loadTsFileTableStatement)
- :
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileTreeStatement))
+ ? loadTsFileDataTypeConverter
+ .convertForTableModel(loadTsFileTableStatement)
+ .orElse(null)
+ : loadTsFileDataTypeConverter
+ .convertForTreeModel(loadTsFileTreeStatement)
+ .orElse(null))
: null;
if (status == null) {
analysis.setFailStatus(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
- } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
- && status.getCode() !=
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+ } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
analysis.setFailStatus(status);
}
analysis.setFinishQueryAfterAnalyze(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index e61d9353852..2025df1a973 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -63,7 +63,7 @@ public class LoadTsFile extends Statement {
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.resources = new ArrayList<>();
this.writePointCountList = new ArrayList<>();
- this.loadAttributes = loadAttributes;
+ this.loadAttributes = loadAttributes == null ? Collections.emptyMap() :
loadAttributes;
initAttributes();
try {
@@ -107,8 +107,9 @@ public class LoadTsFile extends Statement {
return database;
}
- public void setDatabase(String database) {
+ public LoadTsFile setDatabase(String database) {
this.database = database;
+ return this;
}
public String getModel() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 16a84777c3b..aab470880cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -53,14 +53,17 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.PlainDeviceTimeIndex;
+import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileDataCacheMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
@@ -130,6 +133,7 @@ public class LoadTsFileScheduler implements IScheduler {
private final LoadTsFileDispatcherImpl dispatcher;
private final DataPartitionBatchFetcher partitionFetcher;
private final List<LoadSingleTsFileNode> tsFileNodeList;
+ private final List<Integer> failedTsFileNodeIndexes;
private final PlanFragmentId fragmentId;
private final Set<TRegionReplicaSet> allReplicaSets;
private final boolean isGeneratedByPipe;
@@ -145,6 +149,7 @@ public class LoadTsFileScheduler implements IScheduler {
this.queryContext = queryContext;
this.stateMachine = stateMachine;
this.tsFileNodeList = new ArrayList<>();
+ this.failedTsFileNodeIndexes = new ArrayList<>();
this.fragmentId =
distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
this.dispatcher = new
LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
@@ -191,8 +196,8 @@ public class LoadTsFileScheduler implements IScheduler {
} else if (!node.needDecodeTsFile(
slotList ->
partitionFetcher.queryDataPartition(
- slotList,
- queryContext.getSession().getUserName()))) { // do not
decode, load locally
+ slotList, queryContext.getSession().getUserName()))) {
+ // do not decode, load locally
final long startTime = System.nanoTime();
try {
isLoadSingleTsFileSuccess = loadLocally(node);
@@ -200,7 +205,8 @@ public class LoadTsFileScheduler implements IScheduler {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() -
startTime);
}
- } else { // need decode, load locally or remotely, use two phases
method
+ } else {
+ // need decode, load locally or remotely, use two phases method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();
@@ -239,6 +245,7 @@ public class LoadTsFileScheduler implements IScheduler {
tsFileNodeListSize);
} else {
isLoadSuccess = false;
+ failedTsFileNodeIndexes.add(i);
LOGGER.warn(
"Can not Load TsFile {}, load process [{}/{}]",
filePath,
@@ -247,6 +254,7 @@ public class LoadTsFileScheduler implements IScheduler {
}
} catch (Exception e) {
isLoadSuccess = false;
+ failedTsFileNodeIndexes.add(i);
stateMachine.transitionToFailed(e);
LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath,
e);
} finally {
@@ -257,8 +265,18 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
}
+
if (isLoadSuccess) {
stateMachine.transitionToFinished();
+ } else {
+ final long startTime = System.nanoTime();
+ try {
+ // if failed to load some TsFiles, then try to convert the TsFiles
to Tablets
+ convertFailedTsFilesToTabletsAndRetry();
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.CAST_TABLETS, System.nanoTime() -
startTime);
+ }
}
} finally {
LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
@@ -351,7 +369,7 @@ public class LoadTsFileScheduler implements IScheduler {
for (TSStatus status : result.getFailureStatus().getSubStatus()) {
LOGGER.warn(
"Sub status code {}. Sub status message {}.",
- TSStatusCode.representOf(status.getCode()).name(),
+ TSStatusCode.representOf(status.getCode()).toString(),
status.getMessage());
}
}
@@ -534,6 +552,60 @@ public class LoadTsFileScheduler implements IScheduler {
return true;
}
+ private void convertFailedTsFilesToTabletsAndRetry() {
+ final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+ new LoadTsFileDataTypeConverter();
+
+ for (final int failedLoadTsFileIndex : failedTsFileNodeIndexes) {
+ final LoadSingleTsFileNode failedNode =
tsFileNodeList.get(failedLoadTsFileIndex);
+ final String filePath = failedNode.getTsFileResource().getTsFilePath();
+
+ try {
+ final TSStatus status =
+ failedNode.isTableModel()
+ ? loadTsFileDataTypeConverter
+ .convertForTableModel(
+ new LoadTsFile(null, filePath, Collections.emptyMap())
+ .setDatabase(failedNode.getDatabase()))
+ .orElse(null)
+ : loadTsFileDataTypeConverter
+ .convertForTreeModel(new LoadTsFileStatement(filePath))
+ .orElse(null);
+
+ if (loadTsFileDataTypeConverter.isSuccessful(status)) {
+ failedTsFileNodeIndexes.remove(failedLoadTsFileIndex);
+ LOGGER.info(
+ "Load: Successfully converted TsFile {} into tablets and
inserted.",
+ failedNode.getTsFileResource().getTsFilePath());
+ } else {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from TsFile {}. Status: {}",
+ failedNode.getTsFileResource().getTsFilePath(),
+ status);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from TsFile {}. Exception: {}",
+ failedNode.getTsFileResource().getTsFilePath(),
+ e.getMessage(),
+ e);
+ }
+ }
+
+ // If all failed TsFiles are converted into tablets and inserted,
+ // we can consider the load process as successful.
+ if (failedTsFileNodeIndexes.isEmpty()) {
+ stateMachine.transitionToFinished();
+ } else {
+ stateMachine.transitionToFailed(
+ new LoadFileException(
+ "Failed to load some TsFiles by converting them into tablets.
Failed TsFiles: "
+ + failedTsFileNodeIndexes.stream()
+ .map(i ->
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
+ .collect(Collectors.joining(", "))));
+ }
+ }
+
@Override
public void stop(Throwable t) {
// Do nothing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 7ad7a5af544..e1ad5c19cd1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -34,6 +34,8 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
+
public class LoadTsFileDataTypeConverter {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class);
@@ -78,30 +80,35 @@ public class LoadTsFileDataTypeConverter {
public static final LoadConvertedInsertTabletStatementExceptionVisitor
STATEMENT_EXCEPTION_VISITOR = new
LoadConvertedInsertTabletStatementExceptionVisitor();
- public TSStatus convertForTableModel(LoadTsFile loadTsFileTableStatement) {
+ public Optional<TSStatus> convertForTableModel(LoadTsFile
loadTsFileTableStatement) {
try {
- return loadTsFileTableStatement
- .accept(
- tableStatementDataTypeConvertExecutionVisitor,
loadTsFileTableStatement.getDatabase())
- .orElse(null);
+ return loadTsFileTableStatement.accept(
+ tableStatementDataTypeConvertExecutionVisitor,
loadTsFileTableStatement.getDatabase());
} catch (Exception e) {
LOGGER.warn(
"Failed to convert data types for table model statement {}.",
loadTsFileTableStatement,
e);
- return new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ return Optional.of(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
}
}
- public TSStatus convertForTreeModel(LoadTsFileStatement
loadTsFileTreeStatement) {
+ public Optional<TSStatus> convertForTreeModel(LoadTsFileStatement
loadTsFileTreeStatement) {
try {
- return loadTsFileTreeStatement
- .accept(treeStatementDataTypeConvertExecutionVisitor, null)
- .orElse(null);
+ return
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor,
null);
} catch (Exception e) {
LOGGER.warn(
"Failed to convert data types for tree model statement {}.",
loadTsFileTreeStatement, e);
- return new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ return Optional.of(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
}
}
+
+ public boolean isSuccessful(final TSStatus status) {
+ return status != null
+ && (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || status.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ || status.getCode() ==
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index fd0f398644d..5947f101b06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -40,6 +40,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
public static final String FIRST_PHASE = "first_phase";
public static final String SECOND_PHASE = "second_phase";
public static final String LOAD_LOCALLY = "load_locally";
+ public static final String CAST_TABLETS = "cast_tablets";
private LoadTsFileCostMetricsSet() {
// empty constructor
@@ -49,6 +50,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer castTabletsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
@@ -66,6 +68,9 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
case LOAD_LOCALLY:
loadLocallyTimer.updateNanos(costTimeInNanos);
break;
+ case CAST_TABLETS:
+ castTabletsTimer.updateNanos(costTimeInNanos);
+ break;
default:
throw new UnsupportedOperationException("Unsupported stage: " + stage);
}
@@ -98,6 +103,12 @@ public class LoadTsFileCostMetricsSet implements IMetricSet
{
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
LOAD_LOCALLY);
+ castTabletsTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ CAST_TABLETS);
diskIOCounter =
metricService.getOrCreateCounter(
@@ -109,7 +120,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet
{
@Override
public void unbindFrom(AbstractMetricService metricService) {
- Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY)
+ Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY,
CAST_TABLETS)
.forEach(
stage ->
metricService.remove(