fixing SlowConsumerDetection

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/20476a9b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/20476a9b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/20476a9b

Branch: refs/heads/refactor-openwire
Commit: 20476a9b4f8fa30873a3f911406f1901ff3ae290
Parents: 50dfa49
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Thu Feb 25 14:40:04 2016 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Feb 25 18:10:23 2016 -0500

----------------------------------------------------------------------
 .../core/protocol/openwire/OpenWireConnection.java | 12 +++++++++---
 .../protocol/openwire/OpenWireProtocolManager.java | 17 ++++-------------
 .../core/server/impl/ServerConsumerImpl.java       |  4 +++-
 3 files changed, 16 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20476a9b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6f2e3be..dc2a8a6 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -204,6 +204,11 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
          boolean responseRequired = command.isResponseRequired();
          int commandId = command.getCommandId();
+
+
+         // TODO-NOW: the server should send packets to the client based on 
the requested times
+         //           need to look at what Andy did on AMQP
+
          // the connection handles pings, negotiations directly.
          // and delegate all other commands to manager.
          if (command.getClass() == KeepAliveInfo.class) {
@@ -1196,12 +1201,12 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
       @Override
       public Response processMessageDispatch(MessageDispatch arg0) throws 
Exception {
-         throw new IllegalStateException("not implemented! ");
+         return null;
       }
 
       @Override
       public Response 
processMessageDispatchNotification(MessageDispatchNotification arg0) throws 
Exception {
-         throw new IllegalStateException("not implemented! ");
+         return null;
       }
 
       @Override
@@ -1222,7 +1227,8 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
       @Override
       public Response processProducerAck(ProducerAck arg0) throws Exception {
-         throw new IllegalStateException("not implemented! ");
+         // a broker doesn't do producers.. this shouldn't happen
+         return null;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20476a9b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index bdf27f8..514a2b9 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,14 +17,12 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.InvalidClientIDException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -44,7 +42,6 @@ import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
-import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -60,7 +57,6 @@ import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageDispatch;
@@ -91,21 +87,14 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, Cl
 
    private OpenWireFormatFactory wireFactory;
 
-   private boolean tightEncodingEnabled = true;
-
    private boolean prefixPacketSize = true;
 
    private BrokerId brokerId;
    protected final ProducerId advisoryProducerId = new ProducerId();
 
-   // from broker
-   protected final Map<ConnectionId, OpenWireConnection> 
brokerConnectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, 
OpenWireConnection>());
-
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new 
CopyOnWriteArrayList<>();
 
-   protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos 
= new ConcurrentHashMap<>();
-
-   // Clebert TODO: use ConcurrentHashMap, or maybe use the schema that's 
already available on Artemis upstream (unique-client-id)
+   // TODO-NOW: this can probably go away
    private final Map<String, AMQConnectionContext> clientIdSet = new 
HashMap<String, AMQConnectionContext>();
 
    private String brokerName;
@@ -133,11 +122,13 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, Cl
       // preferred prop, should be done via config
       wireFactory.setCacheEnabled(false);
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
-      ManagementService service = server.getManagementService();
       scheduledPool = server.getScheduledPool();
 
       final ClusterManager clusterManager = this.server.getClusterManager();
+
+      // TODO-NOW: use a property name for the cluster connection
       ClusterConnection cc = clusterManager.getDefaultConnection(null);
+
       if (cc != null) {
          cc.addClusterTopologyListener(this);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20476a9b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 2a11d6a..ab9dec9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -234,7 +234,9 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
 
    @Override
    public void fireSlowConsumer() {
-      slowConsumerListener.onSlowConsumer(this);
+      if (slowConsumerListener != null) {
+         slowConsumerListener.onSlowConsumer(this);
+      }
    }
 
    @Override

Reply via email to