This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 9ad7c19788c Pipe: Retry history LoadTsFile while waiting for schema 
(#18031) (#18042)
9ad7c19788c is described below

commit 9ad7c19788c828533d1ef1a5f0446f81725dfd28
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:19:25 2026 +0800

    Pipe: Retry history LoadTsFile while waiting for schema (#18031) (#18042)
    
    * Fix pipe history LoadTsFile schema retry
    
    * Address load tsfile schema retry review
    
    * Fix load tsfile tests with real temp files
    
    (cherry picked from commit 1811e13f6405a1ea2a38ad856bfa32c92ffdd78a)
---
 .../LoadAnalyzeMissingSchemaException.java         | 27 ++++++++
 .../visitor/PipeStatementTSStatusVisitor.java      |  4 +-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      | 80 ++++++++++++++++++++--
 .../receiver/PipeStatementTsStatusVisitorTest.java | 32 +++++++++
 4 files changed, 133 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java
new file mode 100644
index 00000000000..42da0e818a9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadAnalyzeMissingSchemaException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+public class LoadAnalyzeMissingSchemaException extends LoadAnalyzeException {
+
+  public LoadAnalyzeMissingSchemaException(final String message) {
+    super(message);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index d370bff2798..509be0301cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -74,9 +74,7 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   public TSStatus visitLoadFile(
       final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
     if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
-        || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
-            && status.getMessage() != null
-            && status.getMessage().contains("memory")) {
+        || status.getCode() == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
           .setMessage(status.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 5902c342ec5..d47953abe9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadAnalyzeException;
+import org.apache.iotdb.db.exception.LoadAnalyzeMissingSchemaException;
 import org.apache.iotdb.db.exception.LoadAnalyzeTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
 import org.apache.iotdb.db.exception.load.LoadFileException;
@@ -214,6 +215,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       // the real result on the conversion will be set in the analysis.
       return analysis;
     } catch (Exception e) {
+      if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+        return analysis;
+      }
       final String exceptionMessage =
           String.format(
               "Auto create or verify schema error when executing statement %s. 
Detail: %s.",
@@ -315,6 +319,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
                 "The file %s is not a valid tsfile. Please check the input 
file.",
                 tsFile.getPath()));
       } catch (Exception e) {
+        if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+          return false;
+        }
         final String exceptionMessage =
             String.format(
                 "Loading file %s failed. Detail: %s",
@@ -484,6 +491,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
   private Analysis executeTabletConversionOnException(
       final Analysis analysis, final LoadAnalyzeException e) {
+    if (setTemporaryUnavailableStatusIfNecessary(analysis, e)) {
+      return analysis;
+    }
+
     if (shouldSkipConversion(e)) {
       analysis.setFailStatus(
           new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
@@ -517,6 +528,52 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     return analysis;
   }
 
+  private boolean setTemporaryUnavailableStatusIfNecessary(
+      final Analysis analysis, final Throwable throwable) {
+    if (isTemporaryUnavailableDueToPipeSchemaNotReady(throwable)) {
+      setFailAnalysisForTemporaryUnavailablePipeSchema(analysis, throwable);
+      return true;
+    }
+    if (isGeneratedByPipe && 
LoadTsFileDataTypeConverter.isMemoryPressureException(throwable)) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      
analysis.setFailStatus(LoadTsFileDataTypeConverter.getMemoryPressureStatus(throwable));
+      analysis.setStatement(loadTsFileStatement);
+      return true;
+    }
+    return false;
+  }
+
+  private void setFailAnalysisForTemporaryUnavailablePipeSchema(
+      final Analysis analysis, final Throwable throwable) {
+    final String exceptionMessage =
+        String.format(
+            "Pipe generated LoadTsFile is waiting for schema metadata to be 
transferred. Detail: %s",
+            throwable.getMessage() == null
+                ? throwable.getClass().getName()
+                : throwable.getMessage());
+    analysis.setFinishQueryAfterAnalyze(true);
+    analysis.setFailStatus(
+        RpcUtils.getStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION, 
exceptionMessage));
+    analysis.setStatement(loadTsFileStatement);
+  }
+
+  boolean isTemporaryUnavailableDueToPipeSchemaNotReady(final Throwable 
throwable) {
+    if (!isGeneratedByPipe
+        || !isVerifySchema
+        || 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+      return false;
+    }
+
+    Throwable current = throwable;
+    while (current != null) {
+      if (current instanceof LoadAnalyzeMissingSchemaException) {
+        return true;
+      }
+      current = current.getCause();
+    }
+    return false;
+  }
+
   private boolean shouldSkipConversion(LoadAnalyzeException e) {
     return (e instanceof LoadAnalyzeTypeMismatchException)
         && !loadTsFileStatement.isConvertOnTypeMismatch();
@@ -545,7 +602,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     public void autoCreateAndVerify(
         TsFileSequenceReader reader,
         Map<IDeviceID, List<TimeseriesMetadata>> device2TimeSeriesMetadataList)
-        throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
+        throws IOException, AuthException, LoadAnalyzeException {
       for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
           device2TimeSeriesMetadataList.entrySet()) {
         final IDeviceID device = entry.getKey();
@@ -647,14 +704,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       schemaCache.clearDeviceIsAlignedCacheIfNecessary();
     }
 
-    public void flush() throws AuthException, LoadAnalyzeTypeMismatchException 
{
+    public void flush() throws AuthException, LoadAnalyzeException {
       doAutoCreateAndVerify();
 
       schemaCache.clearTimeSeries();
     }
 
     private void doAutoCreateAndVerify()
-        throws SemanticException, AuthException, 
LoadAnalyzeTypeMismatchException {
+        throws SemanticException, AuthException, LoadAnalyzeException {
       if (schemaCache.getDevice2TimeSeries().isEmpty()) {
         return;
       }
@@ -677,6 +734,15 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
         }
       } catch (AuthException | LoadAnalyzeTypeMismatchException e) {
         throw e;
+      } catch (LoadAnalyzeMissingSchemaException e) {
+        if (isTemporaryUnavailableDueToPipeSchemaNotReady(e)) {
+          throw e;
+        }
+        LOGGER.warn("Auto create or verify schema error.", e);
+        throw new SemanticException(
+            String.format(
+                "Auto create or verify schema error when executing statement 
%s.  Detail: %s.",
+                loadTsFileStatement, e.getMessage()));
       } catch (Exception e) {
         if (e.getCause() instanceof LoadAnalyzeTypeMismatchException && 
isConvertOnTypeMismatch) {
           throw (LoadAnalyzeTypeMismatchException) e.getCause();
@@ -863,10 +929,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
                     .collect(Collectors.toList()));
 
         if (iotdbDeviceSchemaInfo == null) {
-          throw new LoadAnalyzeException(
+          throw new LoadAnalyzeMissingSchemaException(
               String.format(
                   "Device %s does not exist in IoTDB and can not be created. "
-                      + "Please check weather auto-create-schema is enabled.",
+                      + "Please check whether auto-create-schema is enabled.",
                   device));
         }
 
@@ -889,10 +955,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
           final MeasurementSchema tsFileSchema = 
tsfileTimeseriesSchemas.get(i);
           final MeasurementSchema iotdbSchema = iotdbTimeseriesSchemas.get(i);
           if (iotdbSchema == null) {
-            throw new LoadAnalyzeException(
+            throw new LoadAnalyzeMissingSchemaException(
                 String.format(
                     "Measurement %s does not exist in IoTDB and can not be 
created. "
-                        + "Please check weather auto-create-schema is 
enabled.",
+                        + "Please check whether auto-create-schema is 
enabled.",
                     device + TsFileConstant.PATH_SEPARATOR + 
tsfileTimeseriesSchemas.get(i)));
           }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
index 756d1181825..f2716d5c1a4 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -24,12 +24,14 @@ import 
org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -64,6 +66,36 @@ public class PipeStatementTsStatusVisitorTest {
             .getCode());
   }
 
+  @Test
+  public void testLoadTemporaryUnavailableClassification() throws Exception {
+    final File tsFile = File.createTempFile("temporary-unavailable", 
".tsfile");
+    tsFile.deleteOnExit();
+
+    Assert.assertEquals(
+        
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+            .process(
+                LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+                new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+                    .setMessage("schema is not ready"))
+            .getCode());
+  }
+
+  @Test
+  public void testLoadFileErrorWithMemoryMessageIsNotClassifiedByMessage() 
throws Exception {
+    final File tsFile = File.createTempFile("memory-error", ".tsfile");
+    tsFile.deleteOnExit();
+
+    Assert.assertEquals(
+        TSStatusCode.LOAD_FILE_ERROR.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+            .process(
+                LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
+                new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+                    .setMessage("memory pressure"))
+            .getCode());
+  }
+
   @Test
   public void testDatabaseNotExistRuntimeExceptionClassification() {
     Assert.assertEquals(

Reply via email to