This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 2eda8bec8fd Fix query stuck while DN restarting & keep quite while
cleaning sort tmp file
2eda8bec8fd is described below
commit 2eda8bec8fd1ba0fac187ed2da88be517256ebff
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Aug 29 16:35:36 2024 +0800
Fix query stuck while DN restarting & keep quite while cleaning sort tmp
file
---
.../db/queryengine/execution/driver/Driver.java | 2 +-
.../fragment/FragmentInstanceExecution.java | 2 +-
.../execution/fragment/FragmentInstanceState.java | 2 +-
.../scheduler/FixedRateFragInsStateTracker.java | 60 ++++++++++++++--------
.../org/apache/iotdb/commons/utils/FileUtils.java | 10 +++-
5 files changed, 50 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
index 1211df6b74b..812c84298fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
@@ -445,7 +445,7 @@ public abstract class Driver implements IDriver {
if (!tmpPipeLineDir.exists()) {
return;
}
- FileUtils.deleteFileOrDirectory(tmpPipeLineDir);
+ FileUtils.deleteFileOrDirectory(tmpPipeLineDir, true);
}
private static Throwable addSuppressedException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index c42834f5190..d1c8cc8ff6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -358,7 +358,7 @@ public class FragmentInstanceExecution {
+ File.separator;
File tmpFile = new File(tmpFilePath);
if (tmpFile.exists()) {
- FileUtils.deleteFileOrDirectory(tmpFile);
+ FileUtils.deleteFileOrDirectory(tmpFile, true);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
index 77531e94494..092b1be3816 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
@@ -47,7 +47,7 @@ public enum FragmentInstanceState {
/** Instance execution failed. */
FAILED(true, true),
/** Instance is not found. */
- NO_SUCH_INSTANCE(false, false);
+ NO_SUCH_INSTANCE(false, true);
public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
Stream.of(FragmentInstanceState.values())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index 31b5742a9d0..a1932753d46 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -88,10 +88,7 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
return res;
}
for (FragmentInstanceId fragmentInstanceId : instanceIds) {
- InstanceStateMetrics stateMetrics =
instanceStateMap.get(fragmentInstanceId);
- if (stateMetrics == null
- || stateMetrics.lastState == null
- || !stateMetrics.lastState.isDone()) {
+ if (unfinished(fragmentInstanceId)) {
// FI whose state has not been updated is considered to be
unfinished.(In Query with limit
// clause, it's possible that the query is finished before the state
of FI being recorded.)
res.add(fragmentInstanceId);
@@ -100,6 +97,15 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
return res;
}
+ private boolean unfinished(FragmentInstanceId fragmentInstanceId) {
+ InstanceStateMetrics stateMetrics =
instanceStateMap.get(fragmentInstanceId);
+ // FI whose state has not been updated is considered to be unfinished.(In
Query with limit
+ // clause, it's possible that the query is finished before the state of FI
being recorded.)
+ return stateMetrics == null
+ || stateMetrics.lastState == null
+ || !stateMetrics.lastState.isDone();
+ }
+
@Override
public synchronized void abort() {
aborted = true;
@@ -117,32 +123,42 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
private void fetchStateAndUpdate() {
for (FragmentInstance instance : instances) {
- try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
- FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
- synchronized (this) {
- InstanceStateMetrics metrics =
- instanceStateMap.computeIfAbsent(
- instance.getId(), k -> new
InstanceStateMetrics(instance.isRoot()));
- if (needPrintState(
- metrics.lastState, instanceInfo.getState(),
metrics.durationToLastPrintInMS)) {
- if (logger.isDebugEnabled()) {
- logger.debug("[PrintFIState] state is {}",
instanceInfo.getState());
+ if (unfinished(instance.getId())) {
+ try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
+ FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
+ synchronized (this) {
+ InstanceStateMetrics metrics =
+ instanceStateMap.computeIfAbsent(
+ instance.getId(), k -> new
InstanceStateMetrics(instance.isRoot()));
+ if (needPrintState(
+ metrics.lastState, instanceInfo.getState(),
metrics.durationToLastPrintInMS)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("[PrintFIState] state is {}",
instanceInfo.getState());
+ }
+ metrics.reset(instanceInfo.getState());
+ } else {
+ metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
}
- metrics.reset(instanceInfo.getState());
- } else {
- metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
- }
- updateQueryState(instance.getId(), instanceInfo);
+ updateQueryState(instance.getId(), instanceInfo);
+ }
+ } catch (ClientManagerException | TException e) {
+ // TODO: do nothing ?
+ logger.warn("error happened while fetching query state", e);
}
- } catch (ClientManagerException | TException e) {
- // TODO: do nothing ?
- logger.warn("error happened while fetching query state", e);
}
}
}
private void updateQueryState(FragmentInstanceId instanceId,
FragmentInstanceInfo instanceInfo) {
+ // no such instance may be caused by DN restarting
+ if (instanceInfo.getState() == FragmentInstanceState.NO_SUCH_INSTANCE) {
+ stateMachine.transitionToFailed(
+ new RuntimeException(
+ String.format(
+ "FragmentInstance[%s] is failed. %s, may be caused by DN
restarting.",
+ instanceId, instanceInfo.getMessage())));
+ }
if (instanceInfo.getState().isFailed()) {
if (instanceInfo.getFailureInfoList() == null
|| instanceInfo.getFailureInfoList().isEmpty()) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index 71498bbe220..1fca92394af 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -62,6 +62,10 @@ public class FileUtils {
}
public static void deleteFileOrDirectory(File file) {
+ deleteFileOrDirectory(file, false);
+ }
+
+ public static void deleteFileOrDirectory(File file, boolean
quiteForNoSuchFile) {
if (file.isDirectory()) {
for (File subfile : file.listFiles()) {
deleteFileOrDirectory(subfile);
@@ -69,7 +73,11 @@ public class FileUtils {
}
try {
Files.delete(file.toPath());
- } catch (NoSuchFileException | DirectoryNotEmptyException e) {
+ } catch (NoSuchFileException e) {
+ if (!quiteForNoSuchFile) {
+ LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e);
+ }
+ } catch (DirectoryNotEmptyException e) {
LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e);
} catch (Exception e) {
LOGGER.warn("{}: {}", e.getMessage(), file.getName(), e);