Syncing java files with master.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8e121dce Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8e121dce Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8e121dce Branch: refs/heads/Pacemaker-HA-1.x-branch2 Commit: 8e121dceedcedbddeefe2b702e8478cd3de72da4 Parents: 737847c Author: Kyle Nusbaum <kylejnusb...@gmail.com> Authored: Mon Oct 10 21:03:25 2016 -0500 Committer: Kyle Nusbaum <kylejnusb...@gmail.com> Committed: Mon Oct 10 21:03:25 2016 -0500 ---------------------------------------------------------------------- .../apache/storm/pacemaker/PacemakerClient.java | 50 ++++++++++---------- .../pacemaker/PacemakerConnectionException.java | 24 ++++++++++ 2 files changed, 48 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8e121dce/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java index 67201b8..cbbfe7f 100644 --- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java @@ -55,12 +55,18 @@ public class PacemakerClient implements ISaslClient { private HBMessage messages[]; private LinkedBlockingQueue<Integer> availableMessageSlots; private ThriftNettyClientCodec.AuthMethod authMethod; + private static Timer timer = new Timer(true); private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20); private int retryTimes = 0; + //the constructor is invoked by pacemaker-state-factory-test + public PacemakerClient() { + bootstrap = new ClientBootstrap(); + } public PacemakerClient(Map config, String host) { + int port = (int)config.get(Config.PACEMAKER_PORT); client_name = (String)config.get(Config.TOPOLOGY_NAME); if(client_name == null) { @@ -143,10 +149,6 @@ public class PacemakerClient implements ISaslClient { retryTimes = 0; } - public boolean isReady() { - return ready.get(); - } - @Override public synchronized void channelReady() { LOG.debug("Channel is ready."); @@ -162,8 +164,7 @@ public class PacemakerClient implements ISaslClient { return secret; } - public HBMessage send(HBMessage m) { - waitUntilReady(); + public HBMessage send(HBMessage m) throws PacemakerConnectionException { LOG.debug("Sending message: {}", m.toString()); try { int next = availableMessageSlots.take(); @@ -172,14 +173,11 @@ public class PacemakerClient implements ISaslClient { messages[next] = m; LOG.debug("Put message in slot: {}", Integer.toString(next)); do { - try { - if (channelRef.get() != null) { - channelRef.get().write(m); - m.wait(1000); - } - } catch (Exception exp) { - LOG.error("error attempting to write to a channel {}", exp); - waitUntilReady(); + waitUntilReady(); + Channel channel = channelRef.get(); + if(channel != null) { + channel.write(m); + m.wait(1000); } } while (messages[next] == m); } @@ -199,16 +197,16 @@ public class PacemakerClient implements ISaslClient { } } - public PacemakerClient waitUntilReady() { + private void waitUntilReady() throws PacemakerConnectionException { // Wait for 'ready' (channel connected and maybe authentication) - if(!isReady() || channelRef.get() == null) { + if(!ready.get() || channelRef.get() == null) { synchronized(this) { - if(!isReady()) { + if(!ready.get()) { LOG.debug("Waiting for netty channel to be ready."); try { this.wait(1000); - if(!isReady() || channelRef.get() == null) { - throw new RuntimeException("Timed out waiting for channel ready."); + if(!ready.get() || channelRef.get() == null) { + throw new PacemakerConnectionException("Timed out waiting for channel ready."); } } catch (InterruptedException e) { throw new RuntimeException(e); @@ -216,14 +214,13 @@ public class PacemakerClient implements ISaslClient { } } } - return this; } public void gotMessage(HBMessage m) { int message_id = m.get_message_id(); - if(message_id >= 0 && message_id < maxPending) { + if(message_id >=0 && message_id < maxPending) { - LOG.debug("Pacemaker Client got message: {}", m.toString()); + LOG.debug("Pacemaker client got message: {}", m.toString()); HBMessage request = messages[message_id]; if(request == null) { @@ -245,10 +242,11 @@ public class PacemakerClient implements ISaslClient { public void reconnect() { final PacemakerClient client = this; timer.schedule(new TimerTask() { - public void run() { - client.doReconnect(); - } - }, backoff.getSleepTimeMs(retryTimes++, 0)); + public void run() { + client.doReconnect(); + } + }, + backoff.getSleepTimeMs(retryTimes++, 0)); ready.set(false); setupMessaging(); } http://git-wip-us.apache.org/repos/asf/storm/blob/8e121dce/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerConnectionException.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerConnectionException.java b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerConnectionException.java new file mode 100644 index 0000000..e2ad119 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerConnectionException.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.pacemaker; + +public class PacemakerConnectionException extends Exception { + public PacemakerConnectionException(String err) { + super(err); + } +}