This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9e5f0396e10 Load: Execute tsfile serially to avoid concurrent
serialization of the resource file (#12434)
9e5f0396e10 is described below
commit 9e5f0396e10e1bbe081820439e35c03bc1db77f8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Apr 28 10:41:55 2024 +0800
Load: Execute tsfile serially to avoid concurrent serialization of the
resource file (#12434)
---
.../plan/scheduler/load/LoadTsFileScheduler.java | 40 +++++++++++++++-------
1 file changed, 27 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index cbc3775f311..e166fa93856 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.LoadReadOnlyException;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -112,6 +113,8 @@ public class LoadTsFileScheduler implements IScheduler {
private static final int TRANSMIT_LIMIT =
CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
+ private static final Set<String> LOADING_FILE_SET = new HashSet<>();
+
private final MPPQueryContext queryContext;
private final QueryStateMachine stateMachine;
private final LoadTsFileDispatcherImpl dispatcher;
@@ -152,14 +155,23 @@ public class LoadTsFileScheduler implements IScheduler {
boolean isLoadSuccess = true;
for (int i = 0; i < tsFileNodeListSize; ++i) {
- LoadSingleTsFileNode node = tsFileNodeList.get(i);
+ final LoadSingleTsFileNode node = tsFileNodeList.get(i);
+ final String filePath = node.getTsFileResource().getTsFilePath();
+
boolean isLoadSingleTsFileSuccess = true;
+ boolean shouldRemoveFileFromLoadingSet = false;
try {
- if (node.isTsFileEmpty()) {
- LOGGER.info(
- "Load skip TsFile {}, because it has no data.",
- node.getTsFileResource().getTsFilePath());
+ synchronized (LOADING_FILE_SET) {
+ if (LOADING_FILE_SET.contains(filePath)) {
+ throw new LoadFileException(
+ String.format("TsFile %s is loading by another scheduler.",
filePath));
+ }
+ LOADING_FILE_SET.add(filePath);
+ }
+ shouldRemoveFileFromLoadingSet = true;
+ if (node.isTsFileEmpty()) {
+ LOGGER.info("Load skip TsFile {}, because it has no data.",
filePath);
} else if (!node.needDecodeTsFile(
slotList ->
partitionFetcher.queryDataPartition(
@@ -167,7 +179,6 @@ public class LoadTsFileScheduler implements IScheduler {
queryContext.getSession().getUserName()))) { // do not
decode, load locally
isLoadSingleTsFileSuccess = loadLocally(node);
node.clean();
-
} else { // need decode, load locally or remotely, use two phases
method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
@@ -182,33 +193,36 @@ public class LoadTsFileScheduler implements IScheduler {
isLoadSingleTsFileSuccess = false;
}
}
+
if (isLoadSingleTsFileSuccess) {
LOGGER.info(
"Load TsFile {} Successfully, load process [{}/{}]",
- node.getTsFileResource().getTsFilePath(),
+ filePath,
i + 1,
tsFileNodeListSize);
} else {
isLoadSuccess = false;
LOGGER.warn(
"Can not Load TsFile {}, load process [{}/{}]",
- node.getTsFileResource().getTsFilePath(),
+ filePath,
i + 1,
tsFileNodeListSize);
}
} catch (Exception e) {
isLoadSuccess = false;
stateMachine.transitionToFailed(e);
- LOGGER.warn(
- "LoadTsFileScheduler loads TsFile {} error",
- node.getTsFileResource().getTsFilePath(),
- e);
+ LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath,
e);
+ } finally {
+ if (shouldRemoveFileFromLoadingSet) {
+ synchronized (LOADING_FILE_SET) {
+ LOADING_FILE_SET.remove(filePath);
+ }
+ }
}
}
if (isLoadSuccess) {
stateMachine.transitionToFinished();
}
-
} finally {
LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
}