This is an automated email from the ASF dual-hosted git repository.
jnioche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 8b28818da STORM-3984: Nimbus failover causes unnecessary reassign if
600s are passed after starting Nimbus.
new 304645d92 Merge pull request #3583 from iwasakims/STORM-3984
8b28818da is described below
commit 8b28818da42a8506ab21c4cd70e511a8cc5c4cca
Author: Masatake Iwasaki <[email protected]>
AuthorDate: Mon Sep 25 23:29:39 2023 +0900
STORM-3984: Nimbus failover causes unnecessary reassign if 600s are passed
after starting Nimbus.
Signed-off-by: Masatake Iwasaki <[email protected]>
---
.../storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
index a8ec26af1..1a8414036 100644
---
a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
+++
b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
@@ -17,6 +17,7 @@ import static java.util.stream.Collectors.toSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.Config;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
@@ -38,19 +39,20 @@ public class TimeOutWorkerHeartbeatsRecoveryStrategy
implements IWorkerHeartbeat
private static int NODE_MAX_TIMEOUT_SECS = 600;
- private long startTimeSecs;
+ private AtomicLong startTimeSecs;
private Set<String> reportedIds;
@Override
public void prepare(Map conf) {
NODE_MAX_TIMEOUT_SECS =
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_HEARTBEATS_MAX_TIMEOUT_SECS),
600);
- this.startTimeSecs = Time.currentTimeMillis() / 1000L;
+ this.startTimeSecs = new AtomicLong(0L);
this.reportedIds = new HashSet<>();
}
@Override
public boolean isReady(Set<String> nodeIds) {
+ startTimeSecs.compareAndSet(0L, Time.currentTimeMillis() / 1000L);
if (exceedsMaxTimeOut()) {
Set<String> tmp = nodeIds.stream().filter(id ->
!this.reportedIds.contains(id)).collect(toSet());
LOG.warn("Failed to recover heartbeats for nodes: {} with timeout
{}s", tmp, NODE_MAX_TIMEOUT_SECS);
@@ -66,7 +68,7 @@ public class TimeOutWorkerHeartbeatsRecoveryStrategy
implements IWorkerHeartbeat
}
private boolean exceedsMaxTimeOut() {
- return (Time.currentTimeMillis() / 1000L - this.startTimeSecs) >
NODE_MAX_TIMEOUT_SECS;
+ return (Time.currentTimeMillis() / 1000L - this.startTimeSecs.get()) >
NODE_MAX_TIMEOUT_SECS;
}
}