This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch load-file-lock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0329c3df395cb26f860545f059f3999e2d1dd203
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Apr 27 02:13:10 2024 +0800

    Load: Execute tsfile serially to avoid concurrent serialization of the 
resource file
---
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 40 +++++++++++++++-------
 1 file changed, 27 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index cbc3775f311..e166fa93856 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -112,6 +113,8 @@ public class LoadTsFileScheduler implements IScheduler {
   private static final int TRANSMIT_LIMIT =
       
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
 
+  private static final Set<String> LOADING_FILE_SET = new HashSet<>();
+
   private final MPPQueryContext queryContext;
   private final QueryStateMachine stateMachine;
   private final LoadTsFileDispatcherImpl dispatcher;
@@ -152,14 +155,23 @@ public class LoadTsFileScheduler implements IScheduler {
       boolean isLoadSuccess = true;
 
       for (int i = 0; i < tsFileNodeListSize; ++i) {
-        LoadSingleTsFileNode node = tsFileNodeList.get(i);
+        final LoadSingleTsFileNode node = tsFileNodeList.get(i);
+        final String filePath = node.getTsFileResource().getTsFilePath();
+
         boolean isLoadSingleTsFileSuccess = true;
+        boolean shouldRemoveFileFromLoadingSet = false;
         try {
-          if (node.isTsFileEmpty()) {
-            LOGGER.info(
-                "Load skip TsFile {}, because it has no data.",
-                node.getTsFileResource().getTsFilePath());
+          synchronized (LOADING_FILE_SET) {
+            if (LOADING_FILE_SET.contains(filePath)) {
+              throw new LoadFileException(
+                  String.format("TsFile %s is loading by another scheduler.", 
filePath));
+            }
+            LOADING_FILE_SET.add(filePath);
+          }
+          shouldRemoveFileFromLoadingSet = true;
 
+          if (node.isTsFileEmpty()) {
+            LOGGER.info("Load skip TsFile {}, because it has no data.", 
filePath);
           } else if (!node.needDecodeTsFile(
               slotList ->
                   partitionFetcher.queryDataPartition(
@@ -167,7 +179,6 @@ public class LoadTsFileScheduler implements IScheduler {
                       queryContext.getSession().getUserName()))) { // do not 
decode, load locally
             isLoadSingleTsFileSuccess = loadLocally(node);
             node.clean();
-
           } else { // need decode, load locally or remotely, use two phases 
method
             String uuid = UUID.randomUUID().toString();
             dispatcher.setUuid(uuid);
@@ -182,33 +193,36 @@ public class LoadTsFileScheduler implements IScheduler {
               isLoadSingleTsFileSuccess = false;
             }
           }
+
           if (isLoadSingleTsFileSuccess) {
             LOGGER.info(
                 "Load TsFile {} Successfully, load process [{}/{}]",
-                node.getTsFileResource().getTsFilePath(),
+                filePath,
                 i + 1,
                 tsFileNodeListSize);
           } else {
             isLoadSuccess = false;
             LOGGER.warn(
                 "Can not Load TsFile {}, load process [{}/{}]",
-                node.getTsFileResource().getTsFilePath(),
+                filePath,
                 i + 1,
                 tsFileNodeListSize);
           }
         } catch (Exception e) {
           isLoadSuccess = false;
           stateMachine.transitionToFailed(e);
-          LOGGER.warn(
-              "LoadTsFileScheduler loads TsFile {} error",
-              node.getTsFileResource().getTsFilePath(),
-              e);
+          LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, 
e);
+        } finally {
+          if (shouldRemoveFileFromLoadingSet) {
+            synchronized (LOADING_FILE_SET) {
+              LOADING_FILE_SET.remove(filePath);
+            }
+          }
         }
       }
       if (isLoadSuccess) {
         stateMachine.transitionToFinished();
       }
-
     } finally {
       LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
     }

Reply via email to