Fixing pacemaker delete_pulse
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a64f135d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a64f135d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a64f135d Branch: refs/heads/master Commit: a64f135d2859d178e4645787e99638a84e1b192f Parents: 15887ee Author: Kyle Nusbaum <[email protected]> Authored: Tue Feb 21 17:14:09 2017 -0600 Committer: Kyle Nusbaum <[email protected]> Committed: Tue Feb 21 17:14:09 2017 -0600 ---------------------------------------------------------------------- .../storm/cluster/PaceMakerStateStorage.java | 34 ++++++++++++++++---- .../org/apache/storm/pacemaker/Pacemaker.java | 1 + 2 files changed, 28 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a64f135d/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java index 3d7d402..e58746d 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java @@ -210,21 +210,41 @@ public class PaceMakerStateStorage implements IStateStorage { @Override public void delete_worker_hb(String path) { int retry = maxRetries; + boolean someSucceeded; while (true) { + someSucceeded = false; try { HBMessage message = new HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path)); - HBMessage response = pacemakerClientPool.send(message); - if (response.get_type() != HBServerMessageType.DELETE_PATH_RESPONSE) { - throw new HBExecutionException("Invalid Response Type"); + List<HBMessage> responses = pacemakerClientPool.sendAll(message); + boolean allSucceeded = true; + for(HBMessage response : responses) { + if (response.get_type() != HBServerMessageType.DELETE_PATH_RESPONSE) { + LOG.debug("Failed to delete heartbeat {}", response); + allSucceeded = false; + } + else { + someSucceeded = true; + } + } + if(allSucceeded) { + break; + } + else { + throw new HBExecutionException("Failed to delete from all pacemakers."); } - LOG.debug("Successful get_worker_hb"); - break; } catch (Exception e) { if (retry <= 0) { - throw Utils.wrapInRuntime(e); + if(someSucceeded) { + LOG.warn("Unable to delete_worker_hb from every pacemaker."); + break; + } + else { + LOG.error("Unable to delete_worker_hb from any pacemaker."); + throw Utils.wrapInRuntime(e); + } } retry--; - LOG.error("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry); + LOG.debug("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/a64f135d/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java b/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java index 0becf68..c49e5a0 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java @@ -190,6 +190,7 @@ public class Pacemaker implements IServerMessageHandler { private HBMessage deletePath(String path) { String prefix = path.endsWith("/") ? path : (path + "/"); for (String key : heartbeats.keySet()) { + key = key + "/"; if (key.indexOf(prefix) == 0) deletePulseId(key); }
