This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5ef4ea1b937 Load: Convert TsFiles into Tablets when the target regions 
are unavailable (#14626) (#14642)
5ef4ea1b937 is described below

commit 5ef4ea1b937c2ac4e3cbc0f622216a2e387207f9
Author: Itami Sho <[email protected]>
AuthorDate: Tue Jan 7 22:45:50 2025 +0800

    Load: Convert TsFiles into Tablets when the target regions are unavailable 
(#14626) (#14642)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    (cherry picked from commit eb5d1df9a4e0ccab162f68d4d7ff669faea6bc49)
---
 .../plan/analyze/LoadTsFileAnalyzer.java           | 11 ++--
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 73 ++++++++++++++++++++--
 .../converter/LoadTsFileDataTypeConverter.java     | 18 ++++--
 .../load/metrics/LoadTsFileCostMetricsSet.java     | 13 +++-
 4 files changed, 98 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index c2f31b9e35a..384d0e30ede 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -131,8 +131,6 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
   private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
 
-  private final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter;
-
   LoadTsFileAnalyzer(
       LoadTsFileStatement loadTsFileStatement,
       MPPQueryContext context,
@@ -145,7 +143,6 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     this.schemaFetcher = schemaFetcher;
 
     this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
-    this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
   }
 
   public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) {
@@ -231,17 +228,17 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
   private void executeDataTypeConversionOnTypeMismatch(
       final Analysis analysis, final VerifyMetadataTypeMismatchException e) {
+    final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+        new LoadTsFileDataTypeConverter();
     final TSStatus status =
         loadTsFileStatement.isConvertOnTypeMismatch()
-            ? 
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement)
+            ? 
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement).orElse(null)
             : null;
 
     if (status == null) {
       analysis.setFailStatus(
           new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
-    } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
-        && status.getCode() != 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+    } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
       analysis.setFailStatus(status);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index e6d472b50dd..0a46eefa4e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -55,10 +55,12 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsF
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
 import 
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileDataCacheMemoryBlock;
 import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
 import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
@@ -126,6 +128,7 @@ public class LoadTsFileScheduler implements IScheduler {
   private final LoadTsFileDispatcherImpl dispatcher;
   private final DataPartitionBatchFetcher partitionFetcher;
   private final List<LoadSingleTsFileNode> tsFileNodeList;
+  private final List<Integer> failedTsFileNodeIndexes;
   private final PlanFragmentId fragmentId;
   private final Set<TRegionReplicaSet> allReplicaSets;
   private final boolean isGeneratedByPipe;
@@ -141,6 +144,7 @@ public class LoadTsFileScheduler implements IScheduler {
     this.queryContext = queryContext;
     this.stateMachine = stateMachine;
     this.tsFileNodeList = new ArrayList<>();
+    this.failedTsFileNodeIndexes = new ArrayList<>();
     this.fragmentId = 
distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
     this.dispatcher = new 
LoadTsFileDispatcherImpl(internalServiceClientManager, isGeneratedByPipe);
     this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
@@ -181,8 +185,8 @@ public class LoadTsFileScheduler implements IScheduler {
           } else if (!node.needDecodeTsFile(
               slotList ->
                   partitionFetcher.queryDataPartition(
-                      slotList,
-                      queryContext.getSession().getUserName()))) { // do not 
decode, load locally
+                      slotList, queryContext.getSession().getUserName()))) {
+            // do not decode, load locally
             final long startTime = System.nanoTime();
             try {
               isLoadSingleTsFileSuccess = loadLocally(node);
@@ -190,7 +194,8 @@ public class LoadTsFileScheduler implements IScheduler {
               LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
                   LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() - 
startTime);
             }
-          } else { // need decode, load locally or remotely, use two phases 
method
+          } else {
+            // need decode, load locally or remotely, use two phases method
             String uuid = UUID.randomUUID().toString();
             dispatcher.setUuid(uuid);
             allReplicaSets.clear();
@@ -229,6 +234,7 @@ public class LoadTsFileScheduler implements IScheduler {
                 tsFileNodeListSize);
           } else {
             isLoadSuccess = false;
+            failedTsFileNodeIndexes.add(i);
             LOGGER.warn(
                 "Can not Load TsFile {}, load process [{}/{}]",
                 filePath,
@@ -237,6 +243,7 @@ public class LoadTsFileScheduler implements IScheduler {
           }
         } catch (Exception e) {
           isLoadSuccess = false;
+          failedTsFileNodeIndexes.add(i);
           stateMachine.transitionToFailed(e);
           LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, 
e);
         } finally {
@@ -247,8 +254,18 @@ public class LoadTsFileScheduler implements IScheduler {
           }
         }
       }
+
       if (isLoadSuccess) {
         stateMachine.transitionToFinished();
+      } else {
+        final long startTime = System.nanoTime();
+        try {
+          // if failed to load some TsFiles, then try to convert the TsFiles 
to Tablets
+          convertFailedTsFilesToTabletsAndRetry();
+        } finally {
+          LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+              LoadTsFileCostMetricsSet.CAST_TABLETS, System.nanoTime() - 
startTime);
+        }
       }
     } finally {
       LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
@@ -341,7 +358,7 @@ public class LoadTsFileScheduler implements IScheduler {
           for (TSStatus status : result.getFailureStatus().getSubStatus()) {
             LOGGER.warn(
                 "Sub status code {}. Sub status message {}.",
-                TSStatusCode.representOf(status.getCode()).name(),
+                TSStatusCode.representOf(status.getCode()).toString(),
                 status.getMessage());
           }
         }
@@ -506,6 +523,54 @@ public class LoadTsFileScheduler implements IScheduler {
     return true;
   }
 
+  private void convertFailedTsFilesToTabletsAndRetry() {
+    final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+        new LoadTsFileDataTypeConverter();
+
+    for (final int failedLoadTsFileIndex : failedTsFileNodeIndexes) {
+      final LoadSingleTsFileNode failedNode = 
tsFileNodeList.get(failedLoadTsFileIndex);
+      final String filePath = failedNode.getTsFileResource().getTsFilePath();
+
+      try {
+        final TSStatus status =
+            loadTsFileDataTypeConverter
+                .convertForTreeModel(new LoadTsFileStatement(filePath))
+                .orElse(null);
+
+        if (loadTsFileDataTypeConverter.isSuccessful(status)) {
+          failedTsFileNodeIndexes.remove(failedLoadTsFileIndex);
+          LOGGER.info(
+              "Load: Successfully converted TsFile {} into tablets and 
inserted.",
+              failedNode.getTsFileResource().getTsFilePath());
+        } else {
+          LOGGER.warn(
+              "Load: Failed to convert to tablets from TsFile {}. Status: {}",
+              failedNode.getTsFileResource().getTsFilePath(),
+              status);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Load: Failed to convert to tablets from TsFile {}. Exception: {}",
+            failedNode.getTsFileResource().getTsFilePath(),
+            e.getMessage(),
+            e);
+      }
+    }
+
+    // If all failed TsFiles are converted into tablets and inserted,
+    // we can consider the load process as successful.
+    if (failedTsFileNodeIndexes.isEmpty()) {
+      stateMachine.transitionToFinished();
+    } else {
+      stateMachine.transitionToFailed(
+          new LoadFileException(
+              "Failed to load some TsFiles by converting them into tablets. 
Failed TsFiles: "
+                  + failedTsFileNodeIndexes.stream()
+                      .map(i -> 
tsFileNodeList.get(i).getTsFileResource().getTsFilePath())
+                      .collect(Collectors.joining(", "))));
+    }
+  }
+
   @Override
   public void stop(Throwable t) {
     // Do nothing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
index b3824140483..05fe606931e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
+
 public class LoadTsFileDataTypeConverter {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class);
@@ -58,15 +60,21 @@ public class LoadTsFileDataTypeConverter {
   public static final LoadConvertedInsertTabletStatementExceptionVisitor
       STATEMENT_EXCEPTION_VISITOR = new 
LoadConvertedInsertTabletStatementExceptionVisitor();
 
-  public TSStatus convertForTreeModel(LoadTsFileStatement 
loadTsFileTreeStatement) {
+  public Optional<TSStatus> convertForTreeModel(LoadTsFileStatement 
loadTsFileTreeStatement) {
     try {
-      return loadTsFileTreeStatement
-          .accept(treeStatementDataTypeConvertExecutionVisitor, null)
-          .orElse(null);
+      return 
loadTsFileTreeStatement.accept(treeStatementDataTypeConvertExecutionVisitor, 
null);
     } catch (Exception e) {
       LOGGER.warn(
           "Failed to convert data types for tree model statement {}.", 
loadTsFileTreeStatement, e);
-      return new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+      return Optional.of(
+          new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
     }
   }
+
+  public boolean isSuccessful(final TSStatus status) {
+    return status != null
+        && (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            || status.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+            || status.getCode() == 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index fd0f398644d..5947f101b06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -40,6 +40,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
   public static final String FIRST_PHASE = "first_phase";
   public static final String SECOND_PHASE = "second_phase";
   public static final String LOAD_LOCALLY = "load_locally";
+  public static final String CAST_TABLETS = "cast_tablets";
 
   private LoadTsFileCostMetricsSet() {
     // empty constructor
@@ -49,6 +50,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
   private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer castTabletsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
   private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
 
@@ -66,6 +68,9 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
       case LOAD_LOCALLY:
         loadLocallyTimer.updateNanos(costTimeInNanos);
         break;
+      case CAST_TABLETS:
+        castTabletsTimer.updateNanos(costTimeInNanos);
+        break;
       default:
         throw new UnsupportedOperationException("Unsupported stage: " + stage);
     }
@@ -98,6 +103,12 @@ public class LoadTsFileCostMetricsSet implements IMetricSet 
{
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
             LOAD_LOCALLY);
+    castTabletsTimer =
+        metricService.getOrCreateTimer(
+            Metric.LOAD_TIME_COST.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            CAST_TABLETS);
 
     diskIOCounter =
         metricService.getOrCreateCounter(
@@ -109,7 +120,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet 
{
 
   @Override
   public void unbindFrom(AbstractMetricService metricService) {
-    Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY)
+    Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY, 
CAST_TABLETS)
         .forEach(
             stage ->
                 metricService.remove(

Reply via email to