This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch load-file-lock in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0329c3df395cb26f860545f059f3999e2d1dd203 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Apr 27 02:13:10 2024 +0800 Load: Execute tsfile serially to avoid concurrent serialization of the resource file --- .../plan/scheduler/load/LoadTsFileScheduler.java | 40 +++++++++++++++------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index cbc3775f311..e166fa93856 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -37,6 +37,7 @@ import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.LoadReadOnlyException; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; import org.apache.iotdb.db.pipe.agent.PipeAgent; @@ -112,6 +113,8 @@ public class LoadTsFileScheduler implements IScheduler { private static final int TRANSMIT_LIMIT = CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit(); + private static final Set<String> LOADING_FILE_SET = new HashSet<>(); + private final MPPQueryContext queryContext; private final QueryStateMachine stateMachine; private final LoadTsFileDispatcherImpl dispatcher; @@ -152,14 +155,23 @@ public class LoadTsFileScheduler implements IScheduler { boolean isLoadSuccess = true; for (int i = 0; i < tsFileNodeListSize; ++i) { - LoadSingleTsFileNode node = tsFileNodeList.get(i); + final LoadSingleTsFileNode node = tsFileNodeList.get(i); + final String filePath = node.getTsFileResource().getTsFilePath(); + boolean isLoadSingleTsFileSuccess = true; + boolean shouldRemoveFileFromLoadingSet = false; try { - if (node.isTsFileEmpty()) { - LOGGER.info( - "Load skip TsFile {}, because it has no data.", - node.getTsFileResource().getTsFilePath()); + synchronized (LOADING_FILE_SET) { + if (LOADING_FILE_SET.contains(filePath)) { + throw new LoadFileException( + String.format("TsFile %s is loading by another scheduler.", filePath)); + } + LOADING_FILE_SET.add(filePath); + } + shouldRemoveFileFromLoadingSet = true; + if (node.isTsFileEmpty()) { + LOGGER.info("Load skip TsFile {}, because it has no data.", filePath); } else if (!node.needDecodeTsFile( slotList -> partitionFetcher.queryDataPartition( @@ -167,7 +179,6 @@ public class LoadTsFileScheduler implements IScheduler { queryContext.getSession().getUserName()))) { // do not decode, load locally isLoadSingleTsFileSuccess = loadLocally(node); node.clean(); - } else { // need decode, load locally or remotely, use two phases method String uuid = UUID.randomUUID().toString(); dispatcher.setUuid(uuid); @@ -182,33 +193,36 @@ public class LoadTsFileScheduler implements IScheduler { isLoadSingleTsFileSuccess = false; } } + if (isLoadSingleTsFileSuccess) { LOGGER.info( "Load TsFile {} Successfully, load process [{}/{}]", - node.getTsFileResource().getTsFilePath(), + filePath, i + 1, tsFileNodeListSize); } else { isLoadSuccess = false; LOGGER.warn( "Can not Load TsFile {}, load process [{}/{}]", - node.getTsFileResource().getTsFilePath(), + filePath, i + 1, tsFileNodeListSize); } } catch (Exception e) { isLoadSuccess = false; stateMachine.transitionToFailed(e); - LOGGER.warn( - "LoadTsFileScheduler loads TsFile {} error", - node.getTsFileResource().getTsFilePath(), - e); + LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, e); + } finally { + if (shouldRemoveFileFromLoadingSet) { + synchronized (LOADING_FILE_SET) { + LOADING_FILE_SET.remove(filePath); + } + } } } if (isLoadSuccess) { stateMachine.transitionToFinished(); } - } finally { LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock(); }
