Repository: activemq-artemis
Updated Branches:
  refs/heads/master 914d93f63 -> a3ae2c4ad


ARTEMIS-474 Clustering fails on certain topologies

Communication between nodes will fail under certain topologies
JGroups has something called JForkChannel that could be used on container 
systems.
And be injected into Artemis.
For some reason that channel cannot be reused for more than one channel per VM.
And it cannot ever be closed.

I am keeping the trace logs I used to debug this issue in case anything similar 
to this happens again.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/630db2d6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/630db2d6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/630db2d6

Branch: refs/heads/master
Commit: 630db2d69c1de2a497169cb601390b55cd1cdd19
Parents: d6c7e30
Author: Clebert Suconic <[email protected]>
Authored: Thu Apr 14 17:51:18 2016 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Thu Apr 14 18:55:01 2016 -0400

----------------------------------------------------------------------
 .../core/ChannelBroadcastEndpointFactory.java   |  54 +++++-
 .../api/core/JGroupsBroadcastEndpoint.java      | 167 ++-----------------
 .../core/JGroupsChannelBroadcastEndpoint.java   |   6 +-
 .../api/core/JGroupsFileBroadcastEndpoint.java  |   5 +-
 .../JGroupsFileBroadcastEndpointFactory.java    |   7 +-
 .../JGroupsPropertiesBroadcastEndpoint.java     |   8 +-
 ...roupsPropertiesBroadcastEndpointFactory.java |   6 +-
 .../api/core/jgroups/JChannelManager.java       |  62 +++++++
 .../api/core/jgroups/JChannelWrapper.java       | 145 ++++++++++++++++
 .../api/core/jgroups/JGroupsReceiver.java       |  72 ++++++++
 .../core/client/impl/ServerLocatorImpl.java     |   4 +-
 .../core/server/cluster/ClusterController.java  |   3 +-
 .../impl/SharedNothingBackupActivation.java     | 108 +++++++++++-
 13 files changed, 474 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
index be5e04c..d7086a5 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
@@ -16,6 +16,11 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+import org.jboss.logging.Logger;
 import org.jgroups.JChannel;
 
 /**
@@ -25,11 +30,49 @@ import org.jgroups.JChannel;
  */
 public class ChannelBroadcastEndpointFactory implements 
BroadcastEndpointFactory {
 
+   private static final Logger logger = 
Logger.getLogger(ChannelBroadcastEndpointFactory.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
    private final JChannel channel;
 
    private final String channelName;
 
+   private final JChannelManager manager;
+
+   private static final Map<JChannel, JChannelManager> managers = new 
ConcurrentHashMap<>();
+
+   private static final JChannelManager singletonManager = new 
JChannelManager();
+//  TODO: To implement this when JForkChannel from JGroups supports multiple 
channels properly
+//
+//   private static JChannelManager recoverManager(JChannel channel) {
+//      JChannelManager manager = managers.get(channel);
+//      if (manager == null) {
+//         if (isTrace) {
+//            logger.trace("Creating a new JChannelManager for " + channel, 
new Exception("trace"));
+//         }
+//         manager = new JChannelManager();
+//         managers.put(channel, manager);
+//      }
+//      else {
+//         if (isTrace) {
+//            logger.trace("Recover an already existent channelManager for " + 
channel, new Exception("trace"));
+//         }
+//
+//      }
+//
+//      return manager;
+//   }
+//
    public ChannelBroadcastEndpointFactory(JChannel channel, String 
channelName) {
+      // TODO: use recoverManager(channel)
+      this(singletonManager, channel, channelName);
+   }
+
+   private ChannelBroadcastEndpointFactory(JChannelManager manager, JChannel 
channel, String channelName) {
+      if (isTrace) {
+         logger.trace("new ChannelBroadcastEndpointFactory(" + manager + ", " 
+ channel + ", " + channelName, new Exception("trace"));
+      }
+      this.manager = manager;
       this.channel = channel;
       this.channelName = channelName;
    }
@@ -43,7 +86,16 @@ public class ChannelBroadcastEndpointFactory implements 
BroadcastEndpointFactory
    }
 
    @Override
+   public String toString() {
+      return "ChannelBroadcastEndpointFactory{" +
+         "channel=" + channel +
+         ", channelName='" + channelName + '\'' +
+         ", manager=" + manager +
+         '}';
+   }
+
+   @Override
    public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
-      return new JGroupsChannelBroadcastEndpoint(channel, 
channelName).initChannel();
+      return new JGroupsChannelBroadcastEndpoint(manager, channel, 
channelName).initChannel();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
index 5bcddbc..7657b0b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
@@ -16,15 +16,11 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
+import org.apache.activemq.artemis.api.core.jgroups.JGroupsReceiver;
+import org.jboss.logging.Logger;
 import org.jgroups.JChannel;
-import org.jgroups.ReceiverAdapter;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -32,6 +28,9 @@ import java.util.concurrent.TimeUnit;
  */
 public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
 
+   private static final Logger logger = 
Logger.getLogger(JGroupsBroadcastEndpoint.class);
+
+   private static final boolean isTrace = logger.isTraceEnabled();
    private final String channelName;
 
    private boolean clientOpened;
@@ -42,12 +41,16 @@ public abstract class JGroupsBroadcastEndpoint implements 
BroadcastEndpoint {
 
    private JGroupsReceiver receiver;
 
-   public JGroupsBroadcastEndpoint(String channelName) {
+   private JChannelManager manager;
+
+   public JGroupsBroadcastEndpoint(JChannelManager manager, String 
channelName) {
+      this.manager = manager;
       this.channelName = channelName;
    }
 
    @Override
    public void broadcast(final byte[] data) throws Exception {
+      if (isTrace) logger.trace("Broadcasting: BroadCastOpened=" + 
broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
       if (broadcastOpened) {
          org.jgroups.Message msg = new org.jgroups.Message();
 
@@ -59,6 +62,7 @@ public abstract class JGroupsBroadcastEndpoint implements 
BroadcastEndpoint {
 
    @Override
    public byte[] receiveBroadcast() throws Exception {
+      if (isTrace) logger.trace("Receiving Broadcast: clientOpened=" + 
clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
       if (clientOpened) {
          return receiver.receiveBroadcast();
       }
@@ -69,6 +73,7 @@ public abstract class JGroupsBroadcastEndpoint implements 
BroadcastEndpoint {
 
    @Override
    public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
+      if (isTrace) logger.trace("Receiving Broadcast2: clientOpened=" + 
clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
       if (clientOpened) {
          return receiver.receiveBroadcast(time, unit);
       }
@@ -99,7 +104,7 @@ public abstract class JGroupsBroadcastEndpoint implements 
BroadcastEndpoint {
    public abstract JChannel createChannel() throws Exception;
 
    public JGroupsBroadcastEndpoint initChannel() throws Exception {
-      this.channel = JChannelManager.getJChannel(channelName, this);
+      this.channel = manager.getJChannel(channelName, this);
       return this;
    }
 
@@ -128,146 +133,4 @@ public abstract class JGroupsBroadcastEndpoint implements 
BroadcastEndpoint {
       channel.close(true);
    }
 
-   /**
-    * This class is used to receive messages from a JGroups channel.
-    * Incoming messages are put into a queue.
-    */
-   private static final class JGroupsReceiver extends ReceiverAdapter {
-
-      private final BlockingQueue<byte[]> dequeue = new 
LinkedBlockingDeque<>();
-
-      @Override
-      public void receive(org.jgroups.Message msg) {
-         dequeue.add(msg.getBuffer());
-      }
-
-      public byte[] receiveBroadcast() throws Exception {
-         return dequeue.take();
-      }
-
-      public byte[] receiveBroadcast(long time, TimeUnit unit) throws 
Exception {
-         return dequeue.poll(time, unit);
-      }
-   }
-
-   /**
-    * This class wraps a JChannel with a reference counter. The reference 
counter
-    * controls the life of the JChannel. When reference count is zero, the 
channel
-    * will be disconnected.
-    */
-   protected static class JChannelWrapper {
-
-      int refCount = 1;
-      JChannel channel;
-      String channelName;
-      final List<JGroupsReceiver> receivers = new ArrayList<>();
-
-      public JChannelWrapper(String channelName, JChannel channel) throws 
Exception {
-         this.refCount = 1;
-         this.channelName = channelName;
-         this.channel = channel;
-
-         //we always add this for the first ref count
-         channel.setReceiver(new ReceiverAdapter() {
-
-            @Override
-            public void receive(org.jgroups.Message msg) {
-               synchronized (receivers) {
-                  for (JGroupsReceiver r : receivers) {
-                     r.receive(msg);
-                  }
-               }
-            }
-         });
-      }
-
-      public synchronized void close(boolean closeWrappedChannel) {
-         refCount--;
-         if (refCount == 0) {
-            if (closeWrappedChannel) {
-               JChannelManager.closeChannel(this.channelName, channel);
-            }
-            else {
-               JChannelManager.removeChannel(this.channelName);
-            }
-            //we always remove the receiver as its no longer needed
-            channel.setReceiver(null);
-         }
-      }
-
-      public void removeReceiver(JGroupsReceiver receiver) {
-         synchronized (receivers) {
-            receivers.remove(receiver);
-         }
-      }
-
-      public synchronized void connect() throws Exception {
-         if (channel.isConnected())
-            return;
-         channel.connect(channelName);
-      }
-
-      public void addReceiver(JGroupsReceiver jGroupsReceiver) {
-         synchronized (receivers) {
-            receivers.add(jGroupsReceiver);
-         }
-      }
-
-      public void send(org.jgroups.Message msg) throws Exception {
-         channel.send(msg);
-      }
-
-      public JChannelWrapper addRef() {
-         this.refCount++;
-         return this;
-      }
-
-      @Override
-      public String toString() {
-         return "JChannelWrapper of [" + channel + "] " + refCount + " " + 
channelName;
-      }
-   }
-
-   /**
-    * This class maintain a global Map of JChannels wrapped in JChannelWrapper 
for
-    * the purpose of reference counting.
-    *
-    * Wherever a JChannel is needed it should only get it by calling the 
getChannel()
-    * method of this class. The real disconnect of channels are also done here 
only.
-    */
-   protected static class JChannelManager {
-
-      private static Map<String, JChannelWrapper> channels;
-
-      public static synchronized JChannelWrapper getJChannel(String 
channelName,
-                                                             
JGroupsBroadcastEndpoint endpoint) throws Exception {
-         if (channels == null) {
-            channels = new HashMap<>();
-         }
-         JChannelWrapper wrapper = channels.get(channelName);
-         if (wrapper == null) {
-            wrapper = new JChannelWrapper(channelName, 
endpoint.createChannel());
-            channels.put(channelName, wrapper);
-            return wrapper;
-         }
-         return wrapper.addRef();
-      }
-
-      public static synchronized void closeChannel(String channelName, 
JChannel channel) {
-         channel.setReceiver(null);
-         channel.disconnect();
-         channel.close();
-         JChannelWrapper wrapper = channels.remove(channelName);
-         if (wrapper == null) {
-            throw new IllegalStateException("Did not find channel " + 
channelName);
-         }
-      }
-
-      public static void removeChannel(String channelName) {
-         JChannelWrapper wrapper = channels.remove(channelName);
-         if (wrapper == null) {
-            throw new IllegalStateException("Did not find channel " + 
channelName);
-         }
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
index 4fbb24c..96cfee6 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
 import org.jgroups.JChannel;
 
 /**
@@ -27,8 +29,8 @@ public class JGroupsChannelBroadcastEndpoint extends 
JGroupsBroadcastEndpoint {
 
    private final JChannel jChannel;
 
-   public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String 
channelName) throws Exception {
-      super(channelName);
+   public JGroupsChannelBroadcastEndpoint(JChannelManager manager, JChannel 
jChannel, final String channelName)  {
+      super(manager, channelName);
       this.jChannel = jChannel;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
index 702cb5a..be903d3 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
 import org.jgroups.JChannel;
 
 import java.net.URL;
@@ -27,8 +28,8 @@ public final class JGroupsFileBroadcastEndpoint extends 
JGroupsBroadcastEndpoint
 
    private String file;
 
-   public JGroupsFileBroadcastEndpoint(final String file, final String 
channelName) throws Exception {
-      super(channelName);
+   public JGroupsFileBroadcastEndpoint(final JChannelManager manager, final 
String file, final String channelName) throws Exception {
+      super(manager, channelName);
       this.file = file;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
index 132ac3c..9f783e7 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
@@ -16,15 +16,20 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+
 public class JGroupsFileBroadcastEndpointFactory implements 
BroadcastEndpointFactory {
 
    private String file;
 
    private String channelName;
 
+   private final JChannelManager manager = new JChannelManager();
+
    @Override
    public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
-      return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel();
+      return new JGroupsFileBroadcastEndpoint(manager, file, 
channelName).initChannel();
    }
 
    public String getFile() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
index 25cefc3..d10400a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpoint.java
@@ -16,18 +16,19 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
 import org.jgroups.JChannel;
 import org.jgroups.conf.PlainConfigurator;
 
 /**
  * This class is the implementation of ActiveMQ Artemis members discovery that 
will use JGroups.
  */
-public final class JGroupsPropertiesBroadcastEndpoint extends 
JGroupsBroadcastEndpoint {
+public final  class JGroupsPropertiesBroadcastEndpoint extends 
JGroupsBroadcastEndpoint {
 
    private String properties;
 
-   public JGroupsPropertiesBroadcastEndpoint(final String properties, final 
String channelName) throws Exception {
-      super(channelName);
+   public JGroupsPropertiesBroadcastEndpoint(final JChannelManager manager, 
final String properties, final String channelName) throws Exception {
+      super(manager, channelName);
       this.properties = properties;
    }
 
@@ -37,3 +38,4 @@ public final class JGroupsPropertiesBroadcastEndpoint extends 
JGroupsBroadcastEn
       return new JChannel(configurator);
    }
 }
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
index 4d80435..8ed03ab 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
@@ -16,15 +16,19 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
+
 public class JGroupsPropertiesBroadcastEndpointFactory implements 
BroadcastEndpointFactory {
 
    private String properties;
 
    private String channelName;
 
+   private final JChannelManager manager = new JChannelManager();
+
    @Override
    public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
-      return new JGroupsPropertiesBroadcastEndpoint(properties, 
channelName).initChannel();
+      return new JGroupsPropertiesBroadcastEndpoint(manager, properties, 
channelName).initChannel();
    }
 
    public String getProperties() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
new file mode 100644
index 0000000..296dc8a
--- /dev/null
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.jgroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.JGroupsBroadcastEndpoint;
+import org.jboss.logging.Logger;
+
+/**
+ * This class maintain a global Map of JChannels wrapped in JChannelWrapper for
+ * the purpose of reference counting.
+ *
+ * Wherever a JChannel is needed it should only get it by calling the 
getChannel()
+ * method of this class. The real disconnect of channels are also done here 
only.
+ */
+public class JChannelManager {
+
+   private static final Logger logger = 
Logger.getLogger(JChannelManager.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
+   private Map<String, JChannelWrapper> channels;
+
+   public synchronized JChannelWrapper getJChannel(String channelName,
+                                                   JGroupsBroadcastEndpoint 
endpoint) throws Exception {
+      if (channels == null) {
+         channels = new HashMap<>();
+      }
+      JChannelWrapper wrapper = channels.get(channelName);
+      if (wrapper == null) {
+         wrapper = new JChannelWrapper(this, channelName, 
endpoint.createChannel());
+         channels.put(channelName, wrapper);
+         if (isTrace)
+            logger.trace("Put Channel " + channelName);
+         return wrapper;
+      }
+      if (isTrace)
+         logger.trace("Add Ref Count " + channelName);
+      return wrapper.addRef();
+   }
+
+   public synchronized void removeChannel(String channelName) {
+      channels.remove(channelName);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
new file mode 100644
index 0000000..08a8ff8
--- /dev/null
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.jgroups;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.logging.Logger;
+import org.jgroups.JChannel;
+import org.jgroups.ReceiverAdapter;
+
+/**
+ * This class wraps a JChannel with a reference counter. The reference counter
+ * controls the life of the JChannel. When reference count is zero, the channel
+ * will be disconnected.
+ */
+public class JChannelWrapper {
+   private static final Logger logger = 
Logger.getLogger(JChannelWrapper.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
+   private boolean connected = false;
+   int refCount = 1;
+   final JChannel channel;
+   final String channelName;
+   final List<JGroupsReceiver> receivers = new ArrayList<>();
+   private final JChannelManager manager;
+
+   public JChannelWrapper(JChannelManager manager, final String channelName, 
JChannel channel) throws Exception {
+      this.refCount = 1;
+      this.channelName = channelName;
+      this.channel = channel;
+      this.manager = manager;
+
+
+      if (channel.getReceiver() != null) {
+         logger.warn("The channel already had a receiver previously!!!!", new 
Exception("trace"));
+      }
+
+      //we always add this for the first ref count
+      channel.setReceiver(new ReceiverAdapter() {
+
+         @Override
+         public void receive(org.jgroups.Message msg) {
+            if (isTrace) {
+               logger.trace(this + ":: Wrapper received " + msg + " on channel 
" + channelName);
+            }
+            synchronized (receivers) {
+               for (JGroupsReceiver r : receivers) {
+                  r.receive(msg);
+               }
+            }
+         }
+      });
+   }
+
+   public JChannel getChannel() {
+      return channel;
+   }
+
+   public String getChannelName() {
+      return channelName;
+   }
+
+   public synchronized void close(boolean closeWrappedChannel) {
+      refCount--;
+      if (isTrace) logger.trace(this + "::RefCount-- " + refCount + " on 
channel " + channelName, new Exception("Trace"));
+      if (refCount == 0) {
+         if (closeWrappedChannel) {
+            connected = false;
+            channel.setReceiver(null);
+            logger.trace(this + "::Closing Channel: " + channelName, new 
Exception("Trace"));
+            channel.close();
+         }
+         manager.removeChannel(channelName);
+      }
+   }
+
+   public void removeReceiver(JGroupsReceiver receiver) {
+      if (isTrace) logger.trace(this + "::removeReceiver: " + receiver + " on 
"  + channelName, new Exception("Trace"));
+      synchronized (receivers) {
+         receivers.remove(receiver);
+      }
+   }
+
+   public synchronized void connect() throws Exception {
+      if (isTrace) {
+         logger.trace(this + ":: Connecting " + channelName, new 
Exception("Trace"));
+      }
+
+      // It is important to check this otherwise we could reconnect an already 
connected channel
+      if (connected) {
+         return;
+      }
+
+      connected = true;
+
+      if (!channel.isConnected()) {
+         channel.connect(channelName);
+      }
+   }
+
+   public void addReceiver(JGroupsReceiver jGroupsReceiver) {
+      synchronized (receivers) {
+         if (isTrace) logger.trace(this + "::Add Receiver: " + jGroupsReceiver 
+ " on " + channelName);
+         receivers.add(jGroupsReceiver);
+      }
+   }
+
+   public void send(org.jgroups.Message msg) throws Exception {
+      if (isTrace) logger.trace(this + "::Sending JGroups Message: Open=" + 
channel.isOpen() + " on channel " + channelName + " msg=" + msg);
+      channel.send(msg);
+   }
+
+   public JChannelWrapper addRef() {
+      this.refCount++;
+      if (isTrace) logger.trace(this + "::RefCount++ = " + refCount + " on 
channel " + channelName);
+      return this;
+   }
+
+   @Override
+   public String toString() {
+      return super.toString() +
+         "{refCount=" + refCount +
+         ", channel=" + channel +
+         ", channelName='" + channelName + '\'' +
+         ", connected=" + connected +
+         '}';
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
new file mode 100644
index 0000000..c931661
--- /dev/null
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core.jgroups;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.logging.Logger;
+import org.jgroups.ReceiverAdapter;
+
+/**
+ * This class is used to receive messages from a JGroups channel.
+ * Incoming messages are put into a queue.
+ */
+public class JGroupsReceiver extends ReceiverAdapter {
+
+   private static final Logger logger = 
Logger.getLogger(JGroupsReceiver.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
+   private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
+
+   @Override
+   public void receive(org.jgroups.Message msg) {
+      if (isTrace) logger.trace("sending message " + msg);
+      dequeue.add(msg.getBuffer());
+   }
+
+   public byte[] receiveBroadcast() throws Exception {
+      byte[] bytes = dequeue.take();
+      if (isTrace) {
+         logBytes("receiveBroadcast()", bytes);
+      }
+
+      return bytes;
+   }
+
+   private void logBytes(String methodName, byte[] bytes) {
+      if (bytes != null) {
+         logger.trace(methodName + "::" + bytes.length + " bytes");
+      }
+      else {
+         logger.trace(methodName + ":: no bytes");
+      }
+   }
+
+   public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
+      byte[] bytes = dequeue.poll(time, unit);
+
+      if (isTrace) {
+         logBytes("receiveBroadcast(long time, TimeUnit unit)", bytes);
+      }
+
+      return bytes;
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index e7cc55a..53ba9df 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -809,7 +809,9 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       // how the sendSubscription happens.
       // in case this ever changes.
       if (topology != null && !factory.waitForTopology(callTimeout, 
TimeUnit.MILLISECONDS)) {
-         factory.cleanup();
+         if (factory != null) {
+            factory.cleanup();
+         }
          throw 
ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index b84164f..175ca99 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -408,8 +408,9 @@ public class ClusterController implements ActiveMQComponent 
{
             }
          }
          catch (ActiveMQException e) {
-            if (!started)
+            if (!started) {
                return;
+            }
             server.getScheduledPool().schedule(this, 
serverLocator.getRetryInterval(), TimeUnit.MILLISECONDS);
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/630db2d6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 12e9298..d9a5c78 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -43,6 +43,7 @@ import 
org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBacku
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.logging.Logger;
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -53,6 +54,10 @@ import static 
org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothi
 
 public final class SharedNothingBackupActivation extends Activation {
 
+
+   private static final Logger logger = 
Logger.getLogger(SharedNothingBackupActivation.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
    //this is how we act when we start as a backup
    private ReplicaPolicy replicaPolicy;
 
@@ -129,43 +134,86 @@ public final class SharedNothingBackupActivation extends 
Activation {
          }
          ClusterController clusterController = 
activeMQServer.getClusterManager().getClusterController();
          
clusterController.addClusterTopologyListenerForReplication(nodeLocator);
+
+         if (isTrace) {
+            logger.trace("Waiting on cluster connection");
+         }
          //todo do we actually need to wait?
          clusterController.awaitConnectionToReplicationCluster();
 
+         if (isTrace) {
+            logger.trace("Cluster Connected");
+         }
          clusterController.addIncomingInterceptorForReplication(new 
ReplicationError(activeMQServer, nodeLocator));
 
          // nodeManager.startBackup();
-
+         if (isTrace) {
+            logger.trace("Starting backup manager");
+         }
          activeMQServer.getBackupManager().start();
 
+         if (isTrace) {
+            logger.trace("Set backup Quorum");
+         }
          replicationEndpoint.setBackupQuorum(backupQuorum);
+
          
replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
          EndpointConnector endpointConnector = new EndpointConnector();
 
+         if (isTrace) {
+            logger.trace("Starting Backup Server");
+         }
+
          
ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(),
 activeMQServer.getNodeManager().getNodeId());
          activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
 
+         if (isTrace) logger.trace("Setting server state as started");
+
          SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
          do {
-            //locate the first live server to try to replicate
-            nodeLocator.locateNode();
+
+
             if (closed) {
+               if (isTrace) {
+                  logger.trace("Activation is closed, so giving up");
+               }
                return;
             }
+
+
+            if (isTrace) {
+               logger.trace("looking up the node through 
nodeLocator.locateNode()");
+            }
+            //locate the first live server to try to replicate
+            nodeLocator.locateNode();
             Pair<TransportConfiguration, TransportConfiguration> possibleLive 
= nodeLocator.getLiveConfiguration();
             nodeID = nodeLocator.getNodeID();
+
+            if (isTrace) {
+               logger.trace("nodeID = " + nodeID);
+            }
             //in a normal (non failback) scenario if we couldn't find our live 
server we should fail
             if (!attemptFailBack) {
+               if (isTrace) {
+                  logger.trace("attemptFailback=false, nodeID=" + nodeID);
+               }
+
                //this shouldn't happen
-               if (nodeID == null)
+               if (nodeID == null) {
+                  logger.debug("Throwing a RuntimeException as nodeID==null 
ant attemptFailback=false");
                   throw new RuntimeException("Could not establish the 
connection");
+               }
                activeMQServer.getNodeManager().setNodeID(nodeID);
             }
 
             try {
+               if (isTrace) {
+                  logger.trace("Calling 
clusterController.connectToNodeInReplicatedCluster(" + possibleLive.getA() + 
")");
+               }
                clusterControl = 
clusterController.connectToNodeInReplicatedCluster(possibleLive.getA());
             }
             catch (Exception e) {
+               logger.debug(e.getMessage(), e);
                if (possibleLive.getB() != null) {
                   try {
                      clusterControl = 
clusterController.connectToNodeInReplicatedCluster(possibleLive.getB());
@@ -176,6 +224,10 @@ public final class SharedNothingBackupActivation extends 
Activation {
                }
             }
             if (clusterControl == null) {
+
+               if (isTrace) {
+                  logger.trace("sleeping " + 
clusterController.getRetryIntervalForReplicatedCluster() + " it should retry");
+               }
                //its ok to retry here since we haven't started replication yet
                //it may just be the server has gone since discovery
                
Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
@@ -190,23 +242,43 @@ public final class SharedNothingBackupActivation extends 
Activation {
              * process again on the next live server.  All the action happens 
inside {@link BackupQuorum}
              */
             signal = backupQuorum.waitForStatusChange();
+
+            if (isTrace) {
+               logger.trace("Got a signal " + signal + " through 
backupQuorum.waitForStatusChange()");
+            }
+
             /**
              * replicationEndpoint will be holding lots of open files. Make 
sure they get
              * closed/sync'ed.
              */
             ActiveMQServerImpl.stopComponent(replicationEndpoint);
             // time to give up
-            if (!activeMQServer.isStarted() || signal == STOP)
+            if (!activeMQServer.isStarted() || signal == STOP) {
+               if (isTrace) {
+                  logger.trace("giving up on the activation:: 
activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + 
signal);
+               }
                return;
+            }
                // time to fail over
-            else if (signal == FAIL_OVER)
+            else if (signal == FAIL_OVER) {
+               if (isTrace) {
+                  logger.trace("signal == FAIL_OVER, breaking the loop");
+               }
                break;
+            }
                // something has gone badly run restart from scratch
             else if (signal == 
SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) {
+               if (isTrace) {
+                  logger.trace("Starting a new thread to stop the server!");
+               }
+
                Thread startThread = new Thread(new Runnable() {
                   @Override
                   public void run() {
                      try {
+                        if (isTrace) {
+                           logger.trace("Calling activeMQServer.stop()");
+                        }
                         activeMQServer.stop();
                      }
                      catch (Exception e) {
@@ -227,17 +299,30 @@ public final class SharedNothingBackupActivation extends 
Activation {
             }
          } while (signal == 
SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
 
+         if (isTrace) {
+            logger.trace("Activation loop finished, current signal = " + 
signal);
+         }
+
          
activeMQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum);
 
          if (!isRemoteBackupUpToDate()) {
+            logger.debug("throwing exception for !isRemoteBackupUptoDate");
             throw ActiveMQMessageBundle.BUNDLE.backupServerNotInSync();
          }
 
+
+         if (isTrace) {
+            logger.trace("setReplicaPolicy::" + replicaPolicy);
+         }
+
          replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);
          activeMQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy());
+
          synchronized (activeMQServer) {
-            if (!activeMQServer.isStarted())
+            if (!activeMQServer.isStarted()) {
+               logger.trace("Server is stopped, giving up right before 
becomingLive");
                return;
+            }
             ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
             activeMQServer.getNodeManager().stopBackup();
             activeMQServer.getStorageManager().start();
@@ -262,6 +347,9 @@ public final class SharedNothingBackupActivation extends 
Activation {
          }
       }
       catch (Exception e) {
+         if (isTrace) {
+            logger.trace(e.getMessage() + ", serverStarted=" + 
activeMQServer.isStarted(), e);
+         }
          if ((e instanceof InterruptedException || e instanceof 
IllegalStateException) && !activeMQServer.isStarted())
             // do not log these errors if the server is being stopped.
             return;
@@ -374,8 +462,10 @@ public final class SharedNothingBackupActivation extends 
Activation {
     * @throws ActiveMQException
     */
    public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping 
finalMessage) throws ActiveMQException {
-      ActiveMQServerLogger.LOGGER.trace("Remote fail-over, got message=" + 
finalMessage + ", backupUpToDate=" +
-                                           backupUpToDate);
+      if (isTrace) {
+         logger.trace("Remote fail-over, got message=" + finalMessage + ", 
backupUpToDate=" +
+                         backupUpToDate);
+      }
       if (!activeMQServer.getHAPolicy().isBackup() || 
activeMQServer.getHAPolicy().isSharedStore()) {
          throw new ActiveMQInternalErrorException();
       }

Reply via email to