Fixing pacemaker delete_pulse

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a64f135d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a64f135d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a64f135d

Branch: refs/heads/master
Commit: a64f135d2859d178e4645787e99638a84e1b192f
Parents: 15887ee
Author: Kyle Nusbaum <[email protected]>
Authored: Tue Feb 21 17:14:09 2017 -0600
Committer: Kyle Nusbaum <[email protected]>
Committed: Tue Feb 21 17:14:09 2017 -0600

----------------------------------------------------------------------
 .../storm/cluster/PaceMakerStateStorage.java    | 34 ++++++++++++++++----
 .../org/apache/storm/pacemaker/Pacemaker.java   |  1 +
 2 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a64f135d/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java 
b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
index 3d7d402..e58746d 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@ -210,21 +210,41 @@ public class PaceMakerStateStorage implements 
IStateStorage {
     @Override
     public void delete_worker_hb(String path) {
         int retry = maxRetries;
+        boolean someSucceeded;
         while (true) {
+            someSucceeded = false;
             try {
                 HBMessage message = new 
HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path));
-                HBMessage response = pacemakerClientPool.send(message);
-                if (response.get_type() != 
HBServerMessageType.DELETE_PATH_RESPONSE) {
-                    throw new HBExecutionException("Invalid Response Type");
+                List<HBMessage> responses = 
pacemakerClientPool.sendAll(message);
+                boolean allSucceeded = true;
+                for(HBMessage response : responses) {
+                    if (response.get_type() != 
HBServerMessageType.DELETE_PATH_RESPONSE) {
+                        LOG.debug("Failed to delete heartbeat {}", response);
+                        allSucceeded = false;
+                    }
+                    else {
+                        someSucceeded = true;
+                    }
+                }
+                if(allSucceeded) {
+                    break;
+                }
+                else {
+                    throw new HBExecutionException("Failed to delete from all 
pacemakers.");
                 }
-                LOG.debug("Successful get_worker_hb");
-                break;
             } catch (Exception e) {
                 if (retry <= 0) {
-                    throw Utils.wrapInRuntime(e);
+                    if(someSucceeded) {
+                        LOG.warn("Unable to delete_worker_hb from every 
pacemaker.");
+                        break;
+                    }
+                    else {
+                        LOG.error("Unable to delete_worker_hb from any 
pacemaker.");
+                        throw Utils.wrapInRuntime(e);
+                    }
                 }
                 retry--;
-                LOG.error("{} Failed to delete_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry);
+                LOG.debug("{} Failed to delete_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a64f135d/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java 
b/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
index 0becf68..c49e5a0 100644
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
@@ -190,6 +190,7 @@ public class Pacemaker implements IServerMessageHandler {
     private HBMessage deletePath(String path) {
         String prefix = path.endsWith("/") ? path : (path + "/");
         for (String key : heartbeats.keySet()) {
+            key = key + "/";
             if (key.indexOf(prefix) == 0)
                 deletePulseId(key);
         }

Reply via email to