This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eb5d1df9a4e Load: Convert TsFiles into Tablets when the target regions 
are unavailable (#14626)
eb5d1df9a4e is described below

commit eb5d1df9a4e0ccab162f68d4d7ff669faea6bc49
Author: Itami Sho <[email protected]>
AuthorDate: Mon Jan 6 15:10:02 2025 +0800

    Load: Convert TsFiles into Tablets when the target regions are unavailable 
(#14626)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../plan/analyze/load/LoadTsFileAnalyzer.java      | 19 ++---
 .../plan/relational/sql/ast/LoadTsFile.java        |  5 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 80 ++++++++++++++++++++--
 .../converter/LoadTsFileDataTypeConverter.java     | 29 +++++---
 .../load/metrics/LoadTsFileCostMetricsSet.java     | 13 +++-
 5 files changed, 119 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 71e985be113..2679b6c196a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -82,8 +82,6 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
   final IPartitionFetcher partitionFetcher = 
ClusterPartitionFetcher.getInstance();
   final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
 
-  protected final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter;
-
   LoadTsFileAnalyzer(LoadTsFileStatement loadTsFileStatement, MPPQueryContext 
context) {
     this.loadTsFileTreeStatement = loadTsFileStatement;
     this.tsFiles = loadTsFileStatement.getTsFiles();
@@ -94,7 +92,6 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
     this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
     this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
     this.database = loadTsFileStatement.getDatabase();
-    this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
 
     this.loadTsFileTableStatement = null;
     this.isTableModelStatement = false;
@@ -111,7 +108,6 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
     this.isAutoCreateDatabase = 
loadTsFileTableStatement.isAutoCreateDatabase();
     this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
     this.database = loadTsFileTableStatement.getDatabase();
-    this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
 
     this.loadTsFileTreeStatement = null;
     this.isTableModelStatement = true;
@@ -178,19 +174,24 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
 
   protected void executeDataTypeConversionOnTypeMismatch(
       final IAnalysis analysis, final VerifyMetadataTypeMismatchException e) {
+    final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+        new LoadTsFileDataTypeConverter();
+
     final TSStatus status =
         isConvertOnTypeMismatch
             ? (isTableModelStatement
-                ? 
loadTsFileDataTypeConverter.convertForTableModel(loadTsFileTableStatement)
-                : 
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileTreeStatement))
+                ? loadTsFileDataTypeConverter
+                    .convertForTableModel(loadTsFileTableStatement)
+                    .orElse(null)
+                : loadTsFileDataTypeConverter
+                    .convertForTreeModel(loadTsFileTreeStatement)
+                    .orElse(null))
             : null;
 
     if (status == null) {
       analysis.setFailStatus(
           new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
-    } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
-        && status.getCode() != 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+    } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
       analysis.setFailStatus(status);
     }
     analysis.setFinishQueryAfterAnalyze(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index e61d9353852..2025df1a973 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -63,7 +63,7 @@ public class LoadTsFile extends Statement {
     this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     this.resources = new ArrayList<>();
     this.writePointCountList = new ArrayList<>();
-    this.loadAttributes = loadAttributes;
+    this.loadAttributes = loadAttributes == null ? Collections.emptyMap() : 
loadAttributes;
     initAttributes();
 
     try {
@@ -107,8 +107,9 @@ public class LoadTsFile extends Statement {
     return database;
   }
 
-  public void setDatabase(String database) {
+  public LoadTsFile setDatabase(String database) {
     this.database = database;
+    return this;
   }
 
   public String getModel() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 16a84777c3b..aab470880cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -53,14 +53,17 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.PlainDeviceTimeIndex;
+import 
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
 import 
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileDataCacheMemoryBlock;
 import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
 import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
@@ -130,6 +133,7 @@ public class LoadTsFileScheduler implements IScheduler {
   private final LoadTsFileDispatcherImpl dispatcher;
   private final DataPartitionBatchFetcher partitionFetcher;
   private final List<LoadSingleTsFileNode> tsFileNodeList;
+  private final List<Integer> failedTsFileNodeIndexes;
   private final PlanFragmentId fragmentId;
   private final Set<TRegionReplicaSet> allReplicaSets;
   private final boolean isGeneratedByPipe;
@@ -145,6 +149,7 @@ public class LoadTsFileScheduler implements IScheduler {
     this.queryContext = queryContext;
     this.stateMachine = stateMachine;
     this.tsFileNodeList = new ArrayList<>();
+    this.failedTsFileNodeIndexes = new ArrayList<>();
     this.fragmentId = 
distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
     this.dispatcher = new 
LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
     this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
@@ -191,8 +196,8 @@ public class LoadTsFileScheduler implements IScheduler {
           } else if (!node.needDecodeTsFile(
               slotList ->
                   partitionFetcher.queryDataPartition(
-                      slotList,
-                      queryContext.getSession().getUserName()))) { // do not 
decode, load locally
+                      slotList, queryContext.getSession().getUserName()))) {
+            // do not decode, load locally
             final long startTime = System.nanoTime();
             try {
               isLoadSingleTsFileSuccess = loadLocally(node);
@@ -200,7 +205,8 @@ public class LoadTsFileScheduler implements IScheduler {
               LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
                   LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() - 
startTime);
             }
-          } else { // need decode, load locally or remotely, use two phases 
method
+          } else {
+            // need decode, load locally or remotely, use two phases method
             String uuid = UUID.randomUUID().toString();
             dispatcher.setUuid(uuid);
             allReplicaSets.clear();
@@ -239,6 +245,7 @@ public class LoadTsFileScheduler implements IScheduler {
                 tsFileNodeListSize);
           } else {
             isLoadSuccess = false;
+            failedTsFileNodeIndexes.add(i);
             LOGGER.warn(
                 "Can not Load TsFile {}, load process [{}/{}]",
                 filePath,
@@ -247,6 +254,7 @@ public class LoadTsFileScheduler implements IScheduler {
           }
         } catch (Exception e) {
           isLoadSuccess = false;
+          failedTsFileNodeIndexes.add(i);
           stateMachine.transitionToFailed(e);
           LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, 
e);
         } finally {
@@ -257,8 +265,18 @@ public class LoadTsFileScheduler implements IScheduler {
           }
         }
       }
+
       if (isLoadSuccess) {
         stateMachine.transitionToFinished();
+      } else {
+        final long startTime = System.nanoTime();
+        try {
+          // if failed to load some TsFiles, then try to convert the TsFiles 
to Tablets
+          convertFailedTsFilesToTabletsAndRetry();
+        } finally {
+          LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+              LoadTsFileCostMetricsSet.CAST_TABLETS, System.nanoTime() - 
startTime);
+        }
       }
     } finally {
       LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
@@ -351,7 +369,7 @@ public class LoadTsFileScheduler implements IScheduler {
           for (TSStatus status : result.getFailureStatus().getSubStatus()) {
             LOGGER.warn(
                 "Sub status code {}. Sub status message {}.",
-                TSStatusCode.representOf(status.getCode()).name(),
+                TSStatusCode.representOf(status.getCode()).toString(),
                 status.getMessage());
           }
         }
@@ -534,6 +552,60 @@ public class LoadTsFileScheduler implements IScheduler {
     return true;
   }
 
+  private void convertFailedTsFilesToTabletsAndRetry() {
+    final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+        new LoadTsFileDataTypeConverter();
+
+    for (final int failedLoadTsFileIndex : failedTsFileNodeIndexes) {
+      final LoadSingleTsFileNode failedNode = 
tsFileNodeList.get(failedLoadTsFileIndex);
+      final String filePath = failedNode.getTsFileResource().getTsFilePath();
+
+      try {
+        final TSStatus status =
+            failedNode.isTableModel()
+                ? loadTsFileDataTypeConverter
+                    .convertForTableModel(
+                        new LoadTsFile(null, filePath, Collections.emptyMap())
+                            .setDatabase(failedNode.getDatabase()))
+                    .orElse(null)
+                : loadTsFileDataTypeConverter
+                    .convertForTreeModel(new LoadTsFileStatement(filePath))
+                    .orElse(null);
+
+        if (loadTsFileDataTypeConverter.isSuccessful(status)) {
+          failedTsFileNodeIndexes.remove(failedLoadTsFileIndex);
+          LOGGER.info(
+              "Load: Successfully converted TsFile {} into tablets and 
inserted.",
+              failedNode.getTsFileResource().getTsFilePath());
+        } else {
+          LOGGER.warn(
+              "Load: Failed to convert to tablets from TsFile {}. Status: {}",
+              failedNode.getTsFileResource().getTsFilePath(),
+              status);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Load: Failed to convert to tablets from TsFile {}. Exception: {}",
+            failedNode.getTsFileResource().getTsFilePath(),
+            e.getMessage(),
+            e);
+      }
+    }
+
+    // If all failed TsFiles are converted into tablets and inserted,
+    // we can consider the load process as successful.
+    if (failedTsFileNodeIndexes.isEmpty()) {
+      stateMachine.transitionToFinished();
+    } else {
+      stateMachine.transitionToFailed(
+          new LoadFileException(
+              "Failed to load some TsFiles by converting them into tablets. 
Failed TsFiles: "
+                  + failedTsFileNodeIndexes.stream()
+                      .map(i -> 
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
+                      .collect(Collectors.joining(", "))));
+    }
+  }
+
   @Override
   public void stop(Throwable t) {
     // Do nothing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index 7ad7a5af544..e1ad5c19cd1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -34,6 +34,8 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
+
 public class LoadTsFileDataTypeConverter {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class);
@@ -78,30 +80,35 @@ public class LoadTsFileDataTypeConverter {
   public static final LoadConvertedInsertTabletStatementExceptionVisitor
       STATEMENT_EXCEPTION_VISITOR = new 
LoadConvertedInsertTabletStatementExceptionVisitor();
 
-  public TSStatus convertForTableModel(LoadTsFile loadTsFileTableStatement) {
+  public Optional<TSStatus> convertForTableModel(LoadTsFile 
loadTsFileTableStatement) {
     try {
-      return loadTsFileTableStatement
-          .accept(
-              tableStatementDataTypeConvertExecutionVisitor, 
loadTsFileTableStatement.getDatabase())
-          .orElse(null);
+      return loadTsFileTableStatement.accept(
+          tableStatementDataTypeConvertExecutionVisitor, 
loadTsFileTableStatement.getDatabase());
     } catch (Exception e) {
       LOGGER.warn(
           "Failed to convert data types for table model statement {}.",
           loadTsFileTableStatement,
           e);
-      return new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+      return Optional.of(
+          new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
     }
   }
 
-  public TSStatus convertForTreeModel(LoadTsFileStatement 
loadTsFileTreeStatement) {
+  public Optional<TSStatus> convertForTreeModel(LoadTsFileStatement 
loadTsFileTreeStatement) {
     try {
-      return loadTsFileTreeStatement
-          .accept(treeStatementDataTypeConvertExecutionVisitor, null)
-          .orElse(null);
+      return 
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor, 
null);
     } catch (Exception e) {
       LOGGER.warn(
           "Failed to convert data types for tree model statement {}.", 
loadTsFileTreeStatement, e);
-      return new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+      return Optional.of(
+          new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
     }
   }
+
+  public boolean isSuccessful(final TSStatus status) {
+    return status != null
+        && (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            || status.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+            || status.getCode() == 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index fd0f398644d..5947f101b06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -40,6 +40,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
   public static final String FIRST_PHASE = "first_phase";
   public static final String SECOND_PHASE = "second_phase";
   public static final String LOAD_LOCALLY = "load_locally";
+  public static final String CAST_TABLETS = "cast_tablets";
 
   private LoadTsFileCostMetricsSet() {
     // empty constructor
@@ -49,6 +50,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
   private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer castTabletsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
   private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
 
@@ -66,6 +68,9 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
       case LOAD_LOCALLY:
         loadLocallyTimer.updateNanos(costTimeInNanos);
         break;
+      case CAST_TABLETS:
+        castTabletsTimer.updateNanos(costTimeInNanos);
+        break;
       default:
         throw new UnsupportedOperationException("Unsupported stage: " + stage);
     }
@@ -98,6 +103,12 @@ public class LoadTsFileCostMetricsSet implements IMetricSet 
{
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
             LOAD_LOCALLY);
+    castTabletsTimer =
+        metricService.getOrCreateTimer(
+            Metric.LOAD_TIME_COST.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            CAST_TABLETS);
 
     diskIOCounter =
         metricService.getOrCreateCounter(
@@ -109,7 +120,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet 
{
 
   @Override
   public void unbindFrom(AbstractMetricService metricService) {
-    Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY)
+    Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY, 
CAST_TABLETS)
         .forEach(
             stage ->
                 metricService.remove(

Reply via email to