Repository: activemq-artemis
Updated Branches:
  refs/heads/master adeaa66a1 -> 1f5f45ca9


Add Unique ClientID on Server


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/74742dcb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/74742dcb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/74742dcb

Branch: refs/heads/master
Commit: 74742dcb66085b5a9169ccdefe1e6806c69d111c
Parents: adeaa66
Author: Martyn Taylor <mtay...@redhat.com>
Authored: Thu Sep 15 14:15:54 2016 +0100
Committer: Martyn Taylor <mtay...@redhat.com>
Committed: Fri Sep 16 16:05:20 2016 +0100

----------------------------------------------------------------------
 .../protocol/proton/ProtonProtocolManager.java  |   2 +-
 .../plug/ActiveMQProtonConnectionCallback.java  | 107 +++++++++----------
 .../org/proton/plug/AMQPConnectionCallback.java |   2 -
 .../server/ProtonServerConnectionContext.java   |   5 -
 .../context/AbstractConnectionContextTest.java  |   4 -
 .../proton/plug/test/invm/ProtonINVMSPI.java    |  10 +-
 .../plug/test/minimalclient/AMQPClientSPI.java  |   4 -
 .../minimalserver/MinimalConnectionSPI.java     |   4 -
 .../artemis/core/server/ActiveMQServer.java     |   4 +
 .../core/server/impl/ActiveMQServerImpl.java    |  23 ++++
 .../tests/integration/proton/ProtonTest.java    |  10 ++
 11 files changed, 88 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index 3567307..a2563a1 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -104,7 +104,7 @@ public class ProtonProtocolManager implements 
ProtocolManager<Interceptor>, Noti
 
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, 
Connection remotingConnection) {
-      ActiveMQProtonConnectionCallback connectionCallback = new 
ActiveMQProtonConnectionCallback(this, remotingConnection, 
server.getExecutorFactory().getExecutor());
+      ActiveMQProtonConnectionCallback connectionCallback = new 
ActiveMQProtonConnectionCallback(this, remotingConnection, 
server.getExecutorFactory().getExecutor(), server);
       long ttl = ActiveMQClient.DEFAULT_CONNECTION_TTL;
 
       if (server.getConfiguration().getConnectionTTLOverride() != -1) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
index 707b312..ea66b01 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
@@ -16,26 +16,29 @@
  */
 package org.apache.activemq.artemis.core.protocol.proton.plug;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import 
org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
 import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
-import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.ReusableLatch;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.jboss.logging.Logger;
@@ -46,13 +49,14 @@ import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
 import org.proton.plug.handler.ExtCapability;
 import org.proton.plug.sasl.AnonymousServerSASL;
-import org.proton.plug.sasl.PlainSASLResult;
 
 import static org.proton.plug.AmqpSupport.CONTAINER_ID;
 import static org.proton.plug.AmqpSupport.INVALID_FIELD;
 import static 
org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
 
-public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback {
+public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback, FailureListener, CloseListener {
+   private static final List<String> connectedContainers = 
Collections.synchronizedList(new ArrayList());
+
    private static final Logger log = 
Logger.getLogger(ActiveMQProtonConnectionCallback.class);
 
    private final ProtonProtocolManager manager;
@@ -67,14 +71,20 @@ public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback
 
    private final Executor closeExecutor;
 
-   private ServerSession internalSession;
+   private String remoteContainerId;
+
+   private AtomicBoolean registeredConnectionId = new AtomicBoolean(false);
+
+   private ActiveMQServer server;
 
    public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
                                            Connection connection,
-                                           Executor closeExecutor) {
+                                           Executor closeExecutor,
+                                           ActiveMQServer server) {
       this.manager = manager;
       this.connection = connection;
       this.closeExecutor = closeExecutor;
+      this.server = server;
    }
 
    @Override
@@ -106,41 +116,9 @@ public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback
    }
 
    @Override
-   public void init() throws Exception {
-      //This internal core session is used to represent the connection
-      //in core server. It is used to identify unique clientIDs.
-      //Note the Qpid-JMS client does create a initial session
-      //for each connection. However is comes in as a Begin
-      //After Open. This makes it unusable for this purpose
-      //as we need to decide the uniqueness in response to
-      //Open, and the checking Uniqueness and adding the unique
-      //client-id to server need to be atomic.
-      if (internalSession == null) {
-         SASLResult saslResult = amqpConnection.getSASLResult();
-         String user = null;
-         String passcode = null;
-         if (saslResult != null) {
-            user = saslResult.getUser();
-            if (saslResult instanceof PlainSASLResult) {
-               passcode = ((PlainSASLResult) saslResult).getPassword();
-            }
-         }
-         internalSession = 
manager.getServer().createSession(createInternalSessionName(), user, passcode, 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, // 
RemotingConnection remotingConnection,
-                 false,
-                 false,
-                 false,
-                 false,
-                 null, (SessionCallback) 
createSessionCallback(this.amqpConnection), true);
-      }
-   }
-
-   @Override
    public void close() {
-      try {
-         internalSession.close(false);
-      }
-      catch (Exception e) {
-         log.error("error closing internal session", e);
+      if (registeredConnectionId.getAndSet(false)) {
+         server.removeClientConnection(remoteContainerId);
       }
       connection.close();
       amqpConnection.close();
@@ -170,6 +148,7 @@ public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback
    }
 
    public void setProtonConnectionDelegate(ActiveMQProtonRemotingConnection 
protonConnectionDelegate) {
+
       this.protonConnectionDelegate = protonConnectionDelegate;
    }
 
@@ -209,25 +188,35 @@ public class ActiveMQProtonConnectionCallback implements 
AMQPConnectionCallback
 
    @Override
    public boolean validateConnection(org.apache.qpid.proton.engine.Connection 
connection, SASLResult saslResult) {
-      String remote = connection.getRemoteContainer();
-
-      if (ExtCapability.needUniqueConnection(connection)) {
-         if 
(!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY,
 remote)) {
-            //https://issues.apache.org/jira/browse/ARTEMIS-728
-            Map<Symbol, Object> connProp = new HashMap<>();
-            connProp.put(CONNECTION_OPEN_FAILED, "true");
-            connection.setProperties(connProp);
-            connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
-            Map<Symbol, Symbol> info = new HashMap<>();
-            info.put(INVALID_FIELD, CONTAINER_ID);
-            connection.getCondition().setInfo(info);
-            return false;
-         }
+      remoteContainerId = connection.getRemoteContainer();
+      boolean idOK = server.addClientConnection(remoteContainerId, 
ExtCapability.needUniqueConnection(connection));
+      if (!idOK) {
+         //https://issues.apache.org/jira/browse/ARTEMIS-728
+         Map<Symbol, Object> connProp = new HashMap<>();
+         connProp.put(CONNECTION_OPEN_FAILED, "true");
+         connection.setProperties(connProp);
+         connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
+         Map<Symbol, Symbol> info = new HashMap<>();
+         info.put(INVALID_FIELD, CONTAINER_ID);
+         connection.getCondition().setInfo(info);
+         return false;
       }
+      registeredConnectionId.set(true);
       return true;
    }
 
-   private String createInternalSessionName() {
-      return "amqp:" + UUIDGenerator.getInstance().generateStringUUID();
+   @Override
+   public void connectionClosed() {
+      close();
+   }
+
+   @Override
+   public void connectionFailed(ActiveMQException exception, boolean 
failedOver) {
+      close();
+   }
+
+   @Override
+   public void connectionFailed(ActiveMQException exception, boolean 
failedOver, String scaleDownTargetNodeID) {
+      close();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
index df14b0f..15a3246 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
@@ -21,8 +21,6 @@ import org.apache.qpid.proton.engine.Connection;
 
 public interface AMQPConnectionCallback {
 
-   void init() throws Exception;
-
    void close();
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
index efaaed4..3386732 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
@@ -64,11 +64,6 @@ public class ProtonServerConnectionContext extends 
AbstractConnectionContext imp
    }
 
    @Override
-   protected void initInternal() throws Exception {
-      connectionCallback.init();
-   }
-
-   @Override
    protected void remoteLinkOpened(Link link) throws Exception {
 
       ProtonServerSessionContext protonSession = (ProtonServerSessionContext) 
getSessionExtension(link.getSession());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
index 91af8f5..da7b617 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
@@ -73,10 +73,6 @@ public class AbstractConnectionContextTest {
    private class TestConnectionCallback implements AMQPConnectionCallback {
 
       @Override
-      public void init() throws Exception {
-      }
-
-      @Override
       public void close() {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
index bf83f8a..5de6e9d 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
@@ -37,6 +37,8 @@ import org.proton.plug.util.ByteUtil;
 public class ProtonINVMSPI implements AMQPConnectionCallback {
 
    private static final Logger log = Logger.getLogger(ProtonINVMSPI.class);
+
+
    AMQPConnectionContext returningConnection;
 
    ProtonServerConnectionContext serverConnection = new 
ProtonServerConnectionContext(new ReturnSPI(), 
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()),
 null);
@@ -61,10 +63,6 @@ public class ProtonINVMSPI implements AMQPConnectionCallback 
{
    }
 
    @Override
-   public void init() throws Exception {
-   }
-
-   @Override
    public void close() {
       mainExecutor.shutdown();
    }
@@ -137,10 +135,6 @@ public class ProtonINVMSPI implements 
AMQPConnectionCallback {
    class ReturnSPI implements AMQPConnectionCallback {
 
       @Override
-      public void init() throws Exception {
-      }
-
-      @Override
       public void close() {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
index fbdee59..be1571c 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
@@ -55,10 +55,6 @@ public class AMQPClientSPI implements AMQPConnectionCallback 
{
    }
 
    @Override
-   public void init() throws Exception {
-   }
-
-   @Override
    public void close() {
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
index 055b29d..1b9c919 100644
--- 
a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
+++ 
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
@@ -51,10 +51,6 @@ public class MinimalConnectionSPI implements 
AMQPConnectionCallback {
    ExecutorService executorService = 
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
 
    @Override
-   public void init() throws Exception {
-   }
-
-   @Override
    public void close() {
       executorService.shutdown();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 0842c0d..ac65335 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -397,4 +397,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
    void setMBeanServer(MBeanServer mBeanServer);
 
    void addExternalComponent(ActiveMQComponent externalComponent);
+
+   boolean addClientConnection(String clientId, boolean unique);
+
+   void removeClientConnection(String clientId);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index f5b9f26..7e32f43 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -309,6 +310,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    private Date startDate;
 
    private final List<ActiveMQComponent> externalComponents = new 
ArrayList<>();
+
+   private final Map<String, AtomicInteger> connectedClientIds = new 
ConcurrentHashMap();
+
    // Constructors
    // 
---------------------------------------------------------------------------------
 
@@ -2396,6 +2400,25 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
       return new Date().getTime() - startDate.getTime();
    }
 
+   public boolean addClientConnection(String clientId, boolean unique) {
+      final AtomicInteger i = connectedClientIds.putIfAbsent(clientId, new 
AtomicInteger(1));
+      if (i != null) {
+         if (unique && i.get() != 0) {
+            return false;
+         }
+         else if (i.incrementAndGet() > 0) {
+            connectedClientIds.put(clientId, i);
+         }
+      }
+      return true;
+   }
+
+   public void removeClientConnection(String clientId) {
+      AtomicInteger i = connectedClientIds.get(clientId);
+      if (i != null && i.decrementAndGet() == 0) {
+         connectedClientIds.remove(clientId);
+      }
+   }
 
    private final class ActivationThread extends Thread {
       final Runnable runnable;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/74742dcb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 245c6b9..193b46b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -1561,6 +1561,16 @@ public class ProtonTest extends ProtonTestBase {
          testConn2.close();
       }
 
+      try {
+         testConn1 = createConnection(false);
+         testConn2 = createConnection(false);
+         testConn1.setClientID("client-id1");
+         testConn2.setClientID("client-id2");
+      }
+      finally {
+         testConn1.close();
+         testConn2.close();
+      }
    }
 
    private javax.jms.Queue createQueue(String address) throws Exception {

Reply via email to