This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/FixBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/FixBug by this push:
new e935217765a more quite
e935217765a is described below
commit e935217765a455771942897ee63907e85b9baf0b
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Aug 29 12:42:54 2024 +0800
more quite
---
.../scheduler/FixedRateFragInsStateTracker.java | 52 +++++++++++++---------
1 file changed, 30 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index 608abea2ba7..a1932753d46 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -88,10 +88,7 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
return res;
}
for (FragmentInstanceId fragmentInstanceId : instanceIds) {
- InstanceStateMetrics stateMetrics =
instanceStateMap.get(fragmentInstanceId);
- if (stateMetrics == null
- || stateMetrics.lastState == null
- || !stateMetrics.lastState.isDone()) {
+ if (unfinished(fragmentInstanceId)) {
// FI whose state has not been updated is considered to be
unfinished.(In Query with limit
// clause, it's possible that the query is finished before the state
of FI being recorded.)
res.add(fragmentInstanceId);
@@ -100,6 +97,15 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
return res;
}
+ private boolean unfinished(FragmentInstanceId fragmentInstanceId) {
+ InstanceStateMetrics stateMetrics =
instanceStateMap.get(fragmentInstanceId);
+ // FI whose state has not been updated is considered to be unfinished.(In
Query with limit
+ // clause, it's possible that the query is finished before the state of FI
being recorded.)
+ return stateMetrics == null
+ || stateMetrics.lastState == null
+ || !stateMetrics.lastState.isDone();
+ }
+
@Override
public synchronized void abort() {
aborted = true;
@@ -117,27 +123,29 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
private void fetchStateAndUpdate() {
for (FragmentInstance instance : instances) {
- try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
- FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
- synchronized (this) {
- InstanceStateMetrics metrics =
- instanceStateMap.computeIfAbsent(
- instance.getId(), k -> new
InstanceStateMetrics(instance.isRoot()));
- if (needPrintState(
- metrics.lastState, instanceInfo.getState(),
metrics.durationToLastPrintInMS)) {
- if (logger.isDebugEnabled()) {
- logger.debug("[PrintFIState] state is {}",
instanceInfo.getState());
+ if (unfinished(instance.getId())) {
+ try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
+ FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
+ synchronized (this) {
+ InstanceStateMetrics metrics =
+ instanceStateMap.computeIfAbsent(
+ instance.getId(), k -> new
InstanceStateMetrics(instance.isRoot()));
+ if (needPrintState(
+ metrics.lastState, instanceInfo.getState(),
metrics.durationToLastPrintInMS)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("[PrintFIState] state is {}",
instanceInfo.getState());
+ }
+ metrics.reset(instanceInfo.getState());
+ } else {
+ metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
}
- metrics.reset(instanceInfo.getState());
- } else {
- metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
- }
- updateQueryState(instance.getId(), instanceInfo);
+ updateQueryState(instance.getId(), instanceInfo);
+ }
+ } catch (ClientManagerException | TException e) {
+ // TODO: do nothing ?
+ logger.warn("error happened while fetching query state", e);
}
- } catch (ClientManagerException | TException e) {
- // TODO: do nothing ?
- logger.warn("error happened while fetching query state", e);
}
}
}