Syncing java files with master.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8e121dce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8e121dce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8e121dce

Branch: refs/heads/Pacemaker-HA-1.x-branch2
Commit: 8e121dceedcedbddeefe2b702e8478cd3de72da4
Parents: 737847c
Author: Kyle Nusbaum <kylejnusb...@gmail.com>
Authored: Mon Oct 10 21:03:25 2016 -0500
Committer: Kyle Nusbaum <kylejnusb...@gmail.com>
Committed: Mon Oct 10 21:03:25 2016 -0500

----------------------------------------------------------------------
 .../apache/storm/pacemaker/PacemakerClient.java | 50 ++++++++++----------
 .../pacemaker/PacemakerConnectionException.java | 24 ++++++++++
 2 files changed, 48 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8e121dce/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java 
b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index 67201b8..cbbfe7f 100644
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -55,12 +55,18 @@ public class PacemakerClient implements ISaslClient {
     private HBMessage messages[];
     private LinkedBlockingQueue<Integer> availableMessageSlots;
     private ThriftNettyClientCodec.AuthMethod authMethod;
+
     private static Timer timer = new Timer(true);
 
     private StormBoundedExponentialBackoffRetry backoff = new 
StormBoundedExponentialBackoffRetry(100, 5000, 20);
     private int retryTimes = 0;
 
+    //the constructor is invoked by pacemaker-state-factory-test
+    public PacemakerClient() {
+        bootstrap = new ClientBootstrap();
+    }
     public PacemakerClient(Map config, String host) {
+
         int port = (int)config.get(Config.PACEMAKER_PORT);
         client_name = (String)config.get(Config.TOPOLOGY_NAME);
         if(client_name == null) {
@@ -143,10 +149,6 @@ public class PacemakerClient implements ISaslClient {
         retryTimes = 0;
     }
 
-    public boolean isReady() {
-        return ready.get();
-    }
-
     @Override
     public synchronized void channelReady() {
         LOG.debug("Channel is ready.");
@@ -162,8 +164,7 @@ public class PacemakerClient implements ISaslClient {
         return secret;
     }
 
-    public HBMessage send(HBMessage m) {
-        waitUntilReady();
+    public HBMessage send(HBMessage m) throws PacemakerConnectionException {
         LOG.debug("Sending message: {}", m.toString());
         try {
             int next = availableMessageSlots.take();
@@ -172,14 +173,11 @@ public class PacemakerClient implements ISaslClient {
                 messages[next] = m;
                 LOG.debug("Put message in slot: {}", Integer.toString(next));
                 do {
-                    try {
-                        if (channelRef.get() != null) {
-                            channelRef.get().write(m);
-                            m.wait(1000);
-                        }
-                    } catch (Exception exp) {
-                        LOG.error("error attempting to write to a channel {}", 
exp);
-                        waitUntilReady();
+                    waitUntilReady();
+                    Channel channel = channelRef.get();
+                    if(channel != null) {
+                        channel.write(m);
+                        m.wait(1000);
                     }
                 } while (messages[next] == m);
             }
@@ -199,16 +197,16 @@ public class PacemakerClient implements ISaslClient {
         }
     }
 
-    public PacemakerClient waitUntilReady() {
+    private void waitUntilReady() throws PacemakerConnectionException {
         // Wait for 'ready' (channel connected and maybe authentication)
-        if(!isReady() || channelRef.get() == null) {
+        if(!ready.get() || channelRef.get() == null) {
             synchronized(this) {
-                if(!isReady()) {
+                if(!ready.get()) {
                     LOG.debug("Waiting for netty channel to be ready.");
                     try {
                         this.wait(1000);
-                        if(!isReady() || channelRef.get() == null) {
-                            throw new RuntimeException("Timed out waiting for 
channel ready.");
+                        if(!ready.get() || channelRef.get() == null) {
+                            throw new PacemakerConnectionException("Timed out 
waiting for channel ready.");
                         }
                     } catch (InterruptedException e) {
                         throw new RuntimeException(e);
@@ -216,14 +214,13 @@ public class PacemakerClient implements ISaslClient {
                 }
             }
         }
-        return this;
     }
 
     public void gotMessage(HBMessage m) {
         int message_id = m.get_message_id();
-        if(message_id >= 0 && message_id < maxPending) {
+        if(message_id >=0 && message_id < maxPending) {
 
-            LOG.debug("Pacemaker Client got message: {}", m.toString());
+            LOG.debug("Pacemaker client got message: {}", m.toString());
             HBMessage request = messages[message_id];
 
             if(request == null) {
@@ -245,10 +242,11 @@ public class PacemakerClient implements ISaslClient {
     public void reconnect() {
         final PacemakerClient client = this;
         timer.schedule(new TimerTask() {
-                              public void run() {
-                                 client.doReconnect();
-                              }
-                           }, backoff.getSleepTimeMs(retryTimes++, 0));
+                public void run() {
+                    client.doReconnect();
+                }
+            },
+            backoff.getSleepTimeMs(retryTimes++, 0));
         ready.set(false);
         setupMessaging();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/8e121dce/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerConnectionException.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerConnectionException.java
 
b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerConnectionException.java
new file mode 100644
index 0000000..e2ad119
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerConnectionException.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.pacemaker;
+
+public class PacemakerConnectionException extends Exception {
+    public PacemakerConnectionException(String err) {
+        super(err);
+    }
+}

Reply via email to