This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7d0f517de5 Add more log to the cancellation operation of
FragmentInstanceStateTracker (#6647)
7d0f517de5 is described below
commit 7d0f517de5616ab33d5021dac083ad296bcf6bd8
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Wed Jul 13 09:07:36 2022 +0800
Add more log to the cancellation operation of FragmentInstanceStateTracker
(#6647)
---
.../scheduler/FixedRateFragInsStateTracker.java | 29 ++++++++++++++++++++--
1 file changed, 27 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 6388e35e18..55e93974d9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -43,9 +43,13 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
private static final Logger logger =
LoggerFactory.getLogger(FixedRateFragInsStateTracker.class);
+ private static final long SAME_STATE_PRINT_RATE_IN_MS = 10 * 60 * 1000;
+
// TODO: (xingtanzjr) consider how much Interval is OK for state tracker
private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
private ScheduledFuture<?> trackTask;
+ private volatile FragmentInstanceState lastState;
+ private volatile long durationToLastPrintInMS;
public FixedRateFragInsStateTracker(
QueryStateMachine stateMachine,
@@ -69,8 +73,15 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
@Override
public void abort() {
+ logger.info("start to abort state tracker");
if (trackTask != null) {
- trackTask.cancel(true);
+ logger.info("start to cancel fixed rate tracking task");
+ boolean cancelResult = trackTask.cancel(true);
+ if (!cancelResult) {
+ logger.error("cancel state tracking task failed. {}",
trackTask.isCancelled());
+ } else {
+ logger.info("cancellation succeeds");
+ }
}
}
@@ -78,7 +89,13 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
for (FragmentInstance instance : instances) {
try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
FragmentInstanceState state = fetchState(instance);
- logger.info("State is {}", state);
+ if (needPrintState(lastState, state, durationToLastPrintInMS)) {
+ logger.info("State is {}", state);
+ lastState = state;
+ durationToLastPrintInMS = 0;
+ } else {
+ durationToLastPrintInMS += STATE_FETCH_INTERVAL_IN_MS;
+ }
if (state != null) {
stateMachine.updateFragInstanceState(instance.getId(), state);
@@ -89,4 +106,12 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
}
}
}
+
+ private boolean needPrintState(
+ FragmentInstanceState previous, FragmentInstanceState current, long
durationToLastPrintInMS) {
+ if (current != previous) {
+ return true;
+ }
+ return durationToLastPrintInMS >= SAME_STATE_PRINT_RATE_IN_MS;
+ }
}