This is an automated email from the ASF dual-hosted git repository.

jnioche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b28818da STORM-3984: Nimbus failover causes unnecessary reassign if 
600s are passed after starting Nimbus.
     new 304645d92 Merge pull request #3583 from iwasakims/STORM-3984
8b28818da is described below

commit 8b28818da42a8506ab21c4cd70e511a8cc5c4cca
Author: Masatake Iwasaki <[email protected]>
AuthorDate: Mon Sep 25 23:29:39 2023 +0900

    STORM-3984: Nimbus failover causes unnecessary reassign if 600s are passed 
after starting Nimbus.
    
    Signed-off-by: Masatake Iwasaki <[email protected]>
---
 .../storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java     | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
index a8ec26af1..1a8414036 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
@@ -17,6 +17,7 @@ import static java.util.stream.Collectors.toSet;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.storm.Config;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
@@ -38,19 +39,20 @@ public class TimeOutWorkerHeartbeatsRecoveryStrategy 
implements IWorkerHeartbeat
 
     private static int NODE_MAX_TIMEOUT_SECS = 600;
 
-    private long startTimeSecs;
+    private AtomicLong startTimeSecs;
 
     private Set<String> reportedIds;
 
     @Override
     public void prepare(Map conf) {
         NODE_MAX_TIMEOUT_SECS = 
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_HEARTBEATS_MAX_TIMEOUT_SECS),
 600);
-        this.startTimeSecs = Time.currentTimeMillis() / 1000L;
+        this.startTimeSecs = new AtomicLong(0L);
         this.reportedIds = new HashSet<>();
     }
 
     @Override
     public boolean isReady(Set<String> nodeIds) {
+        startTimeSecs.compareAndSet(0L, Time.currentTimeMillis() / 1000L);
         if (exceedsMaxTimeOut()) {
             Set<String> tmp = nodeIds.stream().filter(id -> 
!this.reportedIds.contains(id)).collect(toSet());
             LOG.warn("Failed to recover heartbeats for nodes: {} with timeout 
{}s", tmp, NODE_MAX_TIMEOUT_SECS);
@@ -66,7 +68,7 @@ public class TimeOutWorkerHeartbeatsRecoveryStrategy 
implements IWorkerHeartbeat
     }
 
     private boolean exceedsMaxTimeOut() {
-        return (Time.currentTimeMillis() / 1000L - this.startTimeSecs) > 
NODE_MAX_TIMEOUT_SECS;
+        return (Time.currentTimeMillis() / 1000L - this.startTimeSecs.get()) > 
NODE_MAX_TIMEOUT_SECS;
     }
 
 }

Reply via email to