http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java index e47ac01..2a04353 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ServerLocatorImpl.java @@ -45,21 +45,21 @@ import org.apache.activemq.api.core.Pair; import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.client.ClientSessionFactory; import org.apache.activemq.api.core.client.ClusterTopologyListener; -import org.apache.activemq.api.core.client.HornetQClient; +import org.apache.activemq.api.core.client.ActiveMQClient; import org.apache.activemq.api.core.client.TopologyMember; import org.apache.activemq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy; -import org.apache.activemq.core.client.HornetQClientLogger; -import org.apache.activemq.core.client.HornetQClientMessageBundle; +import org.apache.activemq.core.client.ActiveMQClientLogger; +import org.apache.activemq.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.core.cluster.DiscoveryEntry; import org.apache.activemq.core.cluster.DiscoveryGroup; import org.apache.activemq.core.cluster.DiscoveryListener; -import org.apache.activemq.core.protocol.core.impl.HornetQClientProtocolManagerFactory; +import org.apache.activemq.core.protocol.core.impl.ActiveMQClientProtocolManagerFactory; import org.apache.activemq.core.remoting.FailureListener; import org.apache.activemq.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.spi.core.remoting.ClientProtocolManagerFactory; import org.apache.activemq.spi.core.remoting.Connector; import org.apache.activemq.utils.ClassloadingUtil; -import org.apache.activemq.utils.HornetQThreadFactory; +import org.apache.activemq.utils.ActiveMQThreadFactory; import org.apache.activemq.utils.UUIDGenerator; /** @@ -84,7 +84,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery // This is the default value - private ClientProtocolManagerFactory protocolManagerFactory = HornetQClientProtocolManagerFactory.getInstance(); + private ClientProtocolManagerFactory protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(); private final boolean ha; @@ -267,7 +267,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (globalThreadPool == null) { - ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader()); + ThreadFactory factory = new ActiveMQThreadFactory("ActiveMQ-client-global-threads", true, getThisClassLoader()); globalThreadPool = Executors.newCachedThreadPool(factory); } @@ -279,11 +279,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (globalScheduledThreadPool == null) { - ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads", + ThreadFactory factory = new ActiveMQThreadFactory("ActiveMQ-client-global-scheduled-threads", true, getThisClassLoader()); - globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, + globalScheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, factory); } @@ -307,7 +307,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { this.shutdownPool = true; - ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this), + ThreadFactory factory = new ActiveMQThreadFactory("ActiveMQ-client-factory-threads-" + System.identityHashCode(this), true, getThisClassLoader()); @@ -320,7 +320,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory); } - factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this), + factory = new ActiveMQThreadFactory("ActiveMQ-client-factory-pinger-threads-" + System.identityHashCode(this), true, getThisClassLoader()); @@ -386,7 +386,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery catch (Exception e) { state = null; - throw HornetQClientMessageBundle.BUNDLE.failedToInitialiseSessionFactory(e); + throw ActiveMQClientMessageBundle.BUNDLE.failedToInitialiseSessionFactory(e); } } } @@ -415,65 +415,65 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery this.nodeID = UUIDGenerator.getInstance().generateStringUUID(); - clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD; + clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD; - connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL; + connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL; - callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT; + callTimeout = ActiveMQClient.DEFAULT_CALL_TIMEOUT; - callFailoverTimeout = HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT; + callFailoverTimeout = ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT; - minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; + minLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; - consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE; + consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE; - consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE; + consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE; - confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; + confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; - producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE; + producerWindowSize = ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE; - producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE; + producerMaxRate = ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE; - blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE; + blockOnAcknowledge = ActiveMQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE; - blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND; + blockOnDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_DURABLE_SEND; - blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND; + blockOnNonDurableSend = ActiveMQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND; - autoGroup = HornetQClient.DEFAULT_AUTO_GROUP; + autoGroup = ActiveMQClient.DEFAULT_AUTO_GROUP; - preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE; + preAcknowledge = ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE; - ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE; + ackBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE; - connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME; + connectionLoadBalancingPolicyClassName = ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME; - useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS; + useGlobalPools = ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS; - scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE; + scheduledThreadPoolMaxSize = ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE; - threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE; + threadPoolMaxSize = ActiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE; - retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL; + retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL; - retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER; + retryIntervalMultiplier = ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER; - maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL; + maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL; - reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS; + reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS; - initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS; + initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS; - failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION; + failoverOnInitialConnection = ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION; - cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; + cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; - initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE; + initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE; - cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; + cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; - compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; + compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; clusterConnection = false; } @@ -633,7 +633,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (!isClosed()) { - HornetQClientLogger.LOGGER.errorConnectingToNodes(e); + ActiveMQClientLogger.LOGGER.errorConnectingToNodes(e); } } } @@ -652,7 +652,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (protocolManagerFactory == null) { // this could happen over serialization from older versions - protocolManagerFactory = HornetQClientProtocolManagerFactory.getInstance(); + protocolManagerFactory = ActiveMQClientProtocolManagerFactory.getInstance(); } return protocolManagerFactory; } @@ -711,9 +711,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { TopologyMember topologyMember = topology.getMember(nodeID); - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace("Creating connection factory towards " + nodeID + " = " + topologyMember + ", topology=" + topology.describe()); + ActiveMQClientLogger.LOGGER.trace("Creating connection factory towards " + nodeID + " = " + topologyMember + ", topology=" + topology.describe()); } if (topologyMember == null) @@ -856,7 +856,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (!ok) { - throw HornetQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast(); + throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast(); } } @@ -873,7 +873,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery TransportConfiguration tc = selectConnector(); if (tc == null) { - throw HornetQClientMessageBundle.BUNDLE.noTCForSessionFactory(); + throw ActiveMQClientMessageBundle.BUNDLE.noTCForSessionFactory(); } // try each factory in the list until we find one which works @@ -917,11 +917,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (topologyArray != null && attempts == topologyArray.length) { - throw HornetQClientMessageBundle.BUNDLE.cannotConnectToServers(); + throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); } if (topologyArray == null && attempts == this.getNumInitialConnectors()) { - throw HornetQClientMessageBundle.BUNDLE.cannotConnectToServers(); + throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); } } retry = true; @@ -962,7 +962,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (factory != null) factory.cleanup(); - throw HornetQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); + throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); } addFactory(factory); @@ -1465,9 +1465,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (state == STATE.CLOSED) { - if (HornetQClientLogger.LOGGER.isDebugEnabled()) + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { - HornetQClientLogger.LOGGER.debug(this + " is already closed when calling closed"); + ActiveMQClientLogger.LOGGER.debug(this + " is already closed when calling closed"); } return; } @@ -1495,7 +1495,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } catch (Exception e) { - HornetQClientLogger.LOGGER.failedToStopDiscovery(e); + ActiveMQClientLogger.LOGGER.failedToStopDiscovery(e); } } } @@ -1551,7 +1551,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) { - HornetQClientLogger.LOGGER.timedOutWaitingForTermination(); + ActiveMQClientLogger.LOGGER.timedOutWaitingForTermination(); } } catch (InterruptedException e) @@ -1568,7 +1568,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) { - HornetQClientLogger.LOGGER.timedOutWaitingForScheduledPoolTermination(); + ActiveMQClientLogger.LOGGER.timedOutWaitingForScheduledPoolTermination(); } } catch (InterruptedException e) @@ -1598,9 +1598,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return; } - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace")); + ActiveMQClientLogger.LOGGER.trace("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace")); } topology.removeMember(eventTime, nodeID); @@ -1641,9 +1641,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery final Pair<TransportConfiguration, TransportConfiguration> connectorPair, final boolean last) { - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace")); + ActiveMQClientLogger.LOGGER.trace("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace")); } TopologyMemberImpl member = new TopologyMemberImpl(nodeID, backupGroupName, scaleDownGroupName, connectorPair.getA(), connectorPair.getB()); @@ -1757,7 +1757,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } catch (ActiveMQException e) { - HornetQClientLogger.LOGGER.errorConnectingToNodes(e); + ActiveMQClientLogger.LOGGER.errorConnectingToNodes(e); } } }; @@ -1879,9 +1879,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery retryNumber++; for (Connector conn : connectors) { - if (HornetQClientLogger.LOGGER.isDebugEnabled()) + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { - HornetQClientLogger.LOGGER.debug(this + "::Submitting connect towards " + conn); + ActiveMQClientLogger.LOGGER.debug(this + "::Submitting connect towards " + conn); } csf = conn.tryConnect(); @@ -1904,7 +1904,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery catch (Exception e) { // There isn't much to be done if this happens here - HornetQClientLogger.LOGGER.errorStartingLocator(e); + ActiveMQClientLogger.LOGGER.errorStartingLocator(e); } } } @@ -1922,9 +1922,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } }); - if (HornetQClientLogger.LOGGER.isDebugEnabled()) + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { - HornetQClientLogger.LOGGER.debug("Returning " + csf + + ActiveMQClientLogger.LOGGER.debug("Returning " + csf + " after " + retryNumber + " retries on StaticConnector " + @@ -1949,15 +1949,15 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (isClosed() || skipWarnings) return null; - HornetQClientLogger.LOGGER.debug("Rejected execution", e); + ActiveMQClientLogger.LOGGER.debug("Rejected execution", e); throw e; } catch (Exception e) { if (isClosed() || skipWarnings) return null; - HornetQClientLogger.LOGGER.errorConnectingToNodes(e); - throw HornetQClientMessageBundle.BUNDLE.cannotConnectToStaticConnectors(e); + ActiveMQClientLogger.LOGGER.errorConnectingToNodes(e); + throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToStaticConnectors(e); } if (isClosed() || skipWarnings) @@ -1965,8 +1965,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return null; } - HornetQClientLogger.LOGGER.errorConnectingToNodes(traceException); - throw HornetQClientMessageBundle.BUNDLE.cannotConnectToStaticConnectors2(); + ActiveMQClientLogger.LOGGER.errorConnectingToNodes(traceException); + throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToStaticConnectors2(); } private synchronized void createConnectors() @@ -2024,7 +2024,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { if (!isClosed() && finalizeCheck) { - HornetQClientLogger.LOGGER.serverLocatorNotClosed(traceException, System.identityHashCode(this)); + ActiveMQClientLogger.LOGGER.serverLocatorNotClosed(traceException, System.identityHashCode(this)); if (ServerLocatorImpl.finalizeCallback != null) { @@ -2051,9 +2051,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public ClientSessionFactory tryConnect() throws ActiveMQException { - if (HornetQClientLogger.LOGGER.isDebugEnabled()) + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { - HornetQClientLogger.LOGGER.debug(this + "::Trying to connect to " + factory); + ActiveMQClientLogger.LOGGER.debug(this + "::Trying to connect to " + factory); } try { @@ -2075,7 +2075,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } catch (ActiveMQException e) { - HornetQClientLogger.LOGGER.debug(this + "::Exception on establish connector initial connection", e); + ActiveMQClientLogger.LOGGER.debug(this + "::Exception on establish connector initial connection", e); return null; } }
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/Topology.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/Topology.java b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/Topology.java index 89973e2..caefb41 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/Topology.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/Topology.java @@ -25,7 +25,7 @@ import java.util.concurrent.Executor; import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.client.ClusterTopologyListener; -import org.apache.activemq.core.client.HornetQClientLogger; +import org.apache.activemq.core.client.ActiveMQClientLogger; import org.apache.activemq.spi.core.remoting.Connector; /** @@ -64,9 +64,9 @@ public final class Topology implements Serializable public Topology(final Object owner) { this.owner = owner; - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", + ActiveMQClientLogger.LOGGER.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception("trace")); } } @@ -86,9 +86,9 @@ public final class Topology implements Serializable public void addClusterTopologyListener(final ClusterTopologyListener listener) { - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace(this + "::Adding topology listener " + listener, new Exception("Trace")); + ActiveMQClientLogger.LOGGER.trace(this + "::Adding topology listener " + listener, new Exception("Trace")); } synchronized (topologyListeners) { @@ -99,9 +99,9 @@ public final class Topology implements Serializable public void removeClusterTopologyListener(final ClusterTopologyListener listener) { - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace(this + "::Removing topology listener " + listener, new Exception("Trace")); + ActiveMQClientLogger.LOGGER.trace(this + "::Removing topology listener " + listener, new Exception("Trace")); } synchronized (topologyListeners) { @@ -114,9 +114,9 @@ public final class Topology implements Serializable { synchronized (this) { - if (HornetQClientLogger.LOGGER.isDebugEnabled()) + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { - HornetQClientLogger.LOGGER.debug(this + "::node " + nodeId + "=" + memberInput); + ActiveMQClientLogger.LOGGER.debug(this + "::node " + nodeId + "=" + memberInput); } memberInput.setUniqueEventID(System.currentTimeMillis()); topology.remove(nodeId); @@ -146,9 +146,9 @@ public final class Topology implements Serializable public TopologyMemberImpl updateBackup(final TopologyMemberImpl memberInput) { final String nodeId = memberInput.getNodeId(); - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput); + ActiveMQClientLogger.LOGGER.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput); } synchronized (this) @@ -156,9 +156,9 @@ public final class Topology implements Serializable TopologyMemberImpl currentMember = getMember(nodeId); if (currentMember == null) { - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, + ActiveMQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, new Exception("trace")); } @@ -191,7 +191,7 @@ public final class Topology implements Serializable Long deleteTme = getMapDelete().get(nodeId); if (deleteTme != null && uniqueEventID != 0 && uniqueEventID < deleteTme) { - HornetQClientLogger.LOGGER.debug("Update uniqueEvent=" + uniqueEventID + + ActiveMQClientLogger.LOGGER.debug("Update uniqueEvent=" + uniqueEventID + ", nodeId=" + nodeId + ", memberInput=" + @@ -206,9 +206,9 @@ public final class Topology implements Serializable if (currentMember == null) { - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput, + ActiveMQClientLogger.LOGGER.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput, new Exception("trace")); } memberInput.setUniqueEventID(uniqueEventID); @@ -232,9 +232,9 @@ public final class Topology implements Serializable newMember.setBackup(currentMember.getBackup()); } - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" + + ActiveMQClientLogger.LOGGER.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" + currentMember + ", memberInput=" + memberInput + "newMember=" + newMember, new Exception("trace")); @@ -267,9 +267,9 @@ public final class Topology implements Serializable { final ArrayList<ClusterTopologyListener> copy = copyListeners(); - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements"); + ActiveMQClientLogger.LOGGER.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements"); } if (copy.size() > 0) @@ -280,9 +280,9 @@ public final class Topology implements Serializable { for (ClusterTopologyListener listener : copy) { - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace(Topology.this + " informing " + + ActiveMQClientLogger.LOGGER.trace(Topology.this + " informing " + listener + " about node up = " + nodeId + @@ -296,7 +296,7 @@ public final class Topology implements Serializable } catch (Throwable e) { - HornetQClientLogger.LOGGER.errorSendingTopology(e); + ActiveMQClientLogger.LOGGER.errorSendingTopology(e); } } } @@ -328,7 +328,7 @@ public final class Topology implements Serializable { if (member.getUniqueEventID() > uniqueEventID) { - HornetQClientLogger.LOGGER.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call"); + ActiveMQClientLogger.LOGGER.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call"); member = null; } else @@ -339,9 +339,9 @@ public final class Topology implements Serializable } } - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace("removeMember " + this + + ActiveMQClientLogger.LOGGER.trace("removeMember " + this + " removing nodeID=" + nodeId + ", result=" + @@ -360,9 +360,9 @@ public final class Topology implements Serializable { for (ClusterTopologyListener listener : copy) { - if (HornetQClientLogger.LOGGER.isTraceEnabled()) + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - HornetQClientLogger.LOGGER.trace(this + " informing " + listener + " about node down = " + nodeId); + ActiveMQClientLogger.LOGGER.trace(this + " informing " + listener + " about node down = " + nodeId); } try { @@ -370,7 +370,7 @@ public final class Topology implements Serializable } catch (Exception e) { - HornetQClientLogger.LOGGER.errorSendingTopologyNodedown(e); + ActiveMQClientLogger.LOGGER.errorSendingTopologyNodedown(e); } } } @@ -394,9 +394,9 @@ public final class Topology implements Serializable public synchronized void sendTopology(final ClusterTopologyListener listener) { - if (HornetQClientLogger.LOGGER.isDebugEnabled()) + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { - HornetQClientLogger.LOGGER.debug(this + " is sending topology to " + listener); + ActiveMQClientLogger.LOGGER.debug(this + " is sending topology to " + listener); } execute(new Runnable() @@ -414,9 +414,9 @@ public final class Topology implements Serializable for (Map.Entry<String, TopologyMemberImpl> entry : copy.entrySet()) { - if (HornetQClientLogger.LOGGER.isDebugEnabled()) + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { - HornetQClientLogger.LOGGER.debug(Topology.this + " sending " + + ActiveMQClientLogger.LOGGER.debug(Topology.this + " sending " + entry.getKey() + " / " + entry.getValue().getConnector() + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java b/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java index ce4dbd2..1ee0662 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/cluster/DiscoveryGroup.java @@ -27,8 +27,8 @@ import org.apache.activemq.api.core.BroadcastEndpointFactory; import org.apache.activemq.api.core.SimpleString; import org.apache.activemq.api.core.TransportConfiguration; import org.apache.activemq.api.core.management.CoreNotificationType; -import org.apache.activemq.core.client.HornetQClientLogger; -import org.apache.activemq.core.server.HornetQComponent; +import org.apache.activemq.core.client.ActiveMQClientLogger; +import org.apache.activemq.core.server.ActiveMQComponent; import org.apache.activemq.core.server.management.Notification; import org.apache.activemq.core.server.management.NotificationService; import org.apache.activemq.utils.TypedProperties; @@ -46,9 +46,9 @@ import org.apache.activemq.utils.TypedProperties; * @author <a href="mailto:[email protected]">Tim Fox</a> * @author Clebert Suconic */ -public final class DiscoveryGroup implements HornetQComponent +public final class DiscoveryGroup implements ActiveMQComponent { - private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled(); + private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled(); private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>(); @@ -106,7 +106,7 @@ public final class DiscoveryGroup implements HornetQComponent started = true; - thread = new Thread(new DiscoveryRunnable(), "hornetq-discovery-group-thread-" + name); + thread = new Thread(new DiscoveryRunnable(), "activemq-discovery-group-thread-" + name); thread.setDaemon(true); @@ -147,7 +147,7 @@ public final class DiscoveryGroup implements HornetQComponent } catch (Exception e1) { - HornetQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1); + ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1); } try @@ -156,7 +156,7 @@ public final class DiscoveryGroup implements HornetQComponent thread.join(10000); if (thread.isAlive()) { - HornetQClientLogger.LOGGER.timedOutStoppingDiscovery(); + ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery(); } } catch (InterruptedException e) @@ -177,7 +177,7 @@ public final class DiscoveryGroup implements HornetQComponent } catch (Exception e) { - HornetQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e); + ActiveMQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e); } } } @@ -252,7 +252,7 @@ public final class DiscoveryGroup implements HornetQComponent { if (!currentUniqueID.equals(uniqueID)) { - HornetQClientLogger.LOGGER.multipleServersBroadcastingSameNode(originatingNodeID); + ActiveMQClientLogger.LOGGER.multipleServersBroadcastingSameNode(originatingNodeID); uniqueIDMap.put(originatingNodeID, uniqueID); } } @@ -278,7 +278,7 @@ public final class DiscoveryGroup implements HornetQComponent { // This is totally unexpected, so I'm not even bothering on creating // a log entry for that - HornetQClientLogger.LOGGER.warn("Unexpected null data received from DiscoveryEndpoint"); + ActiveMQClientLogger.LOGGER.warn("Unexpected null data received from DiscoveryEndpoint"); } break; } @@ -291,7 +291,7 @@ public final class DiscoveryGroup implements HornetQComponent } else { - HornetQClientLogger.LOGGER.errorReceivingPAcketInDiscovery(e); + ActiveMQClientLogger.LOGGER.errorReceivingPAcketInDiscovery(e); } } @@ -346,10 +346,10 @@ public final class DiscoveryGroup implements HornetQComponent { if (isTrace) { - HornetQClientLogger.LOGGER.trace("Connectors changed on Discovery:"); + ActiveMQClientLogger.LOGGER.trace("Connectors changed on Discovery:"); for (DiscoveryEntry connector : connectors.values()) { - HornetQClientLogger.LOGGER.trace(connector); + ActiveMQClientLogger.LOGGER.trace(connector); } } callListeners(); @@ -365,7 +365,7 @@ public final class DiscoveryGroup implements HornetQComponent } catch (Exception e) { - HornetQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e); + ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e); } } @@ -397,7 +397,7 @@ public final class DiscoveryGroup implements HornetQComponent catch (Throwable t) { // Catch it so exception doesn't prevent other listeners from running - HornetQClientLogger.LOGGER.failedToCallListenerInDiscovery(t); + ActiveMQClientLogger.LOGGER.failedToCallListenerInDiscovery(t); } } } @@ -419,7 +419,7 @@ public final class DiscoveryGroup implements HornetQComponent { if (isTrace) { - HornetQClientLogger.LOGGER.trace("Timed out node on discovery:" + entry.getValue()); + ActiveMQClientLogger.LOGGER.trace("Timed out node on discovery:" + entry.getValue()); } iter.remove(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/exception/ActiveMQXAException.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/exception/ActiveMQXAException.java b/activemq-core-client/src/main/java/org/apache/activemq/core/exception/ActiveMQXAException.java new file mode 100644 index 0000000..d25a364 --- /dev/null +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/exception/ActiveMQXAException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.core.exception; + +import javax.transaction.xa.XAException; + +/** + * A ActiveMQXAException + * + * @author Tim Fox + * + * + */ +public class ActiveMQXAException extends XAException +{ + private static final long serialVersionUID = 6535914602965015803L; + + public ActiveMQXAException(final int errorCode, final String message) + { + super(message); + + this.errorCode = errorCode; + } + + public ActiveMQXAException(final int errorCode) + { + super(errorCode); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/exception/HornetQXAException.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/exception/HornetQXAException.java b/activemq-core-client/src/main/java/org/apache/activemq/core/exception/HornetQXAException.java deleted file mode 100644 index e721d3e..0000000 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/exception/HornetQXAException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.apache.activemq.core.exception; - -import javax.transaction.xa.XAException; - -/** - * A HornetQXAException - * - * @author Tim Fox - * - * - */ -public class HornetQXAException extends XAException -{ - private static final long serialVersionUID = 6535914602965015803L; - - public HornetQXAException(final int errorCode, final String message) - { - super(message); - - this.errorCode = errorCode; - } - - public HornetQXAException(final int errorCode) - { - super(errorCode); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/message/BodyEncoder.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/message/BodyEncoder.java b/activemq-core-client/src/main/java/org/apache/activemq/core/message/BodyEncoder.java index 609386e..e926747 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/message/BodyEncoder.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/message/BodyEncoder.java @@ -28,27 +28,27 @@ import org.apache.activemq.api.core.ActiveMQException; public interface BodyEncoder { /** - * This method must not be called directly by HornetQ clients. + * This method must not be called directly by ActiveMQ clients. */ void open() throws ActiveMQException; /** - * This method must not be called directly by HornetQ clients. + * This method must not be called directly by ActiveMQ clients. */ void close() throws ActiveMQException; /** - * This method must not be called directly by HornetQ clients. + * This method must not be called directly by ActiveMQ clients. */ int encode(ByteBuffer bufferRead) throws ActiveMQException; /** - * This method must not be called directly by HornetQ clients. + * This method must not be called directly by ActiveMQ clients. */ int encode(ActiveMQBuffer bufferOut, int size) throws ActiveMQException; /** - * This method must not be called directly by HornetQ clients. + * This method must not be called directly by ActiveMQ clients. */ long getLargeBodySize(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/message/impl/MessageImpl.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/message/impl/MessageImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/message/impl/MessageImpl.java index 8d4f3d7..a955b9a 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/message/impl/MessageImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/message/impl/MessageImpl.java @@ -23,7 +23,7 @@ import org.apache.activemq.api.core.ActiveMQException; import org.apache.activemq.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.api.core.Message; import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.core.buffers.impl.ResetLimitWrappedHornetQBuffer; +import org.apache.activemq.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.core.message.BodyEncoder; import org.apache.activemq.core.protocol.core.impl.PacketImpl; import org.apache.activemq.utils.ByteUtil; @@ -34,7 +34,7 @@ import org.apache.activemq.utils.UUID; /** * A concrete implementation of a message * <p> - * All messages handled by HornetQ core are of this type + * All messages handled by ActiveMQ core are of this type * * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> * @author <a href="mailto:[email protected]">Tim Fox</a> @@ -78,7 +78,7 @@ public abstract class MessageImpl implements MessageInternal protected ActiveMQBuffer buffer; - protected ResetLimitWrappedHornetQBuffer bodyBuffer; + protected ResetLimitWrappedActiveMQBuffer bodyBuffer; protected volatile boolean bufferValid; @@ -100,7 +100,7 @@ public abstract class MessageImpl implements MessageInternal } /** - * overridden by the client message, we need access to the connection so we can create the appropriate HornetQBuffer. + * overridden by the client message, we need access to the connection so we can create the appropriate ActiveMQBuffer. * * @param type * @param durable @@ -265,7 +265,7 @@ public abstract class MessageImpl implements MessageInternal { if (bodyBuffer == null) { - bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this); + bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this); } return bodyBuffer; @@ -299,7 +299,7 @@ public abstract class MessageImpl implements MessageInternal newBuffer.setIndex(0, getEndOfBodyPosition()); - return new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, newBuffer, null); + return new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, newBuffer, null); } public long getMessageID() http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/CoreRemotingConnection.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/CoreRemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/CoreRemotingConnection.java index 07fd491..f9af32e 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/CoreRemotingConnection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/CoreRemotingConnection.java @@ -12,12 +12,12 @@ */ package org.apache.activemq.core.protocol.core; -import org.apache.activemq.core.security.HornetQPrincipal; +import org.apache.activemq.core.security.ActiveMQPrincipal; import org.apache.activemq.spi.core.protocol.RemotingConnection; /** - * Extension of RemotingConnection for the HornetQ core protocol + * Extension of RemotingConnection for the ActiveMQ core protocol * @author Tim Fox */ public interface CoreRemotingConnection extends RemotingConnection @@ -100,5 +100,5 @@ public interface CoreRemotingConnection extends RemotingConnection * Returns the default security principal * @return the principal */ - HornetQPrincipal getDefaultHornetQPrincipal(); + ActiveMQPrincipal getDefaultActiveMQPrincipal(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java new file mode 100644 index 0000000..ba593cc --- /dev/null +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -0,0 +1,613 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.core.protocol.core.impl; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +import io.netty.channel.ChannelPipeline; +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQExceptionType; +import org.apache.activemq.api.core.ActiveMQInterruptedException; +import org.apache.activemq.api.core.Interceptor; +import org.apache.activemq.api.core.Pair; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ClientSessionFactory; +import org.apache.activemq.api.core.client.ActiveMQClient; +import org.apache.activemq.core.client.ActiveMQClientLogger; +import org.apache.activemq.core.client.ActiveMQClientMessageBundle; +import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.core.protocol.ClientPacketDecoder; +import org.apache.activemq.core.protocol.core.Channel; +import org.apache.activemq.core.protocol.core.ChannelHandler; +import org.apache.activemq.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.core.protocol.core.Packet; +import org.apache.activemq.core.protocol.core.impl.wireformat.CheckFailoverMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2; +import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3; +import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.DisconnectMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.DisconnectMessage_V2; +import org.apache.activemq.core.protocol.core.impl.wireformat.Ping; +import org.apache.activemq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq.core.remoting.impl.netty.ActiveMQFrameDecoder2; +import org.apache.activemq.core.version.Version; +import org.apache.activemq.spi.core.protocol.RemotingConnection; +import org.apache.activemq.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq.spi.core.remoting.Connection; +import org.apache.activemq.spi.core.remoting.TopologyResponseHandler; +import org.apache.activemq.spi.core.remoting.SessionContext; +import org.apache.activemq.utils.VersionLoader; + +/** + * This class will return specific packets for different types of actions happening on a messaging protocol. + * <p/> + * This is trying to unify the Core client into multiple protocols. + * <p/> + * Returning null in certain packets means no action is taken on this specific protocol. + * <p/> + * Semantic properties could also be added to this implementation. + * <p/> + * Implementations of this class need to be stateless. + * + * @author Clebert Suconic + */ + +public class ActiveMQClientProtocolManager implements ClientProtocolManager +{ + private final int versionID = VersionLoader.getVersion().getIncrementingVersion(); + + private ClientSessionFactoryInternal factoryInternal; + + /** + * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch} + */ + private final Object inCreateSessionGuard = new Object(); + + /** + * Flag that tells whether we are trying to create a session. + */ + private boolean inCreateSession; + + /** + * Used to wait for the creation of a session. + */ + private CountDownLatch inCreateSessionLatch; + + protected volatile RemotingConnectionImpl connection; + + protected TopologyResponseHandler topologyResponseHandler; + + /** + * Flag that signals that the communication is closing. Causes many processes to exit. + */ + private volatile boolean alive = true; + + private final CountDownLatch waitLatch = new CountDownLatch(1); + + + public ActiveMQClientProtocolManager() + { + } + + public String getName() + { + return ActiveMQClient.DEFAULT_CORE_PROTOCOL; + } + + public void setSessionFactory(ClientSessionFactory factory) + { + this.factoryInternal = (ClientSessionFactoryInternal)factory; + } + + public ClientSessionFactory getSessionFactory() + { + return this.factoryInternal; + } + + @Override + public void addChannelHandlers(ChannelPipeline pipeline) + { + pipeline.addLast("activemq-decoder", new ActiveMQFrameDecoder2()); + } + + public boolean waitOnLatch(long milliseconds) throws InterruptedException + { + return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS); + } + + public Channel getChannel0() + { + if (connection == null) + { + return null; + } + else + { + return connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); + } + } + + public RemotingConnection getCurrentConnection() + { + return connection; + } + + + public Channel getChannel1() + { + if (connection == null) + { + return null; + } + else + { + return connection.getChannel(1, -1); + } + } + + public Lock lockSessionCreation() + { + try + { + Lock localFailoverLock = factoryInternal.lockFailover(); + try + { + if (connection == null) + { + return null; + } + + Lock lock = getChannel1().getLock(); + + // Lock it - this must be done while the failoverLock is held + while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS)) + { + } + + return lock; + } + finally + { + localFailoverLock.unlock(); + } + // We can now release the failoverLock + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return null; + } + } + + + public void stop() + { + alive = false; + + + synchronized (inCreateSessionGuard) + { + if (inCreateSessionLatch != null) + inCreateSessionLatch.countDown(); + } + + + Channel channel1 = getChannel1(); + if (channel1 != null) + { + channel1.returnBlocking(); + } + + waitLatch.countDown(); + + } + + public boolean isAlive() + { + return alive; + } + + + @Override + public void ping(long connectionTTL) + { + Channel channel = connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); + + Ping ping = new Ping(connectionTTL); + + channel.send(ping); + + connection.flush(); + } + + @Override + public void sendSubscribeTopology(final boolean isServer) + { + getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, + VersionLoader.getVersion() + .getIncrementingVersion())); + } + + @Override + public SessionContext createSessionContext(String name, String username, String password, + boolean xa, boolean autoCommitSends, boolean autoCommitAcks, + boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException + { + for (Version clientVersion : VersionLoader.getClientVersions()) + { + try + { + return createSessionContext(clientVersion, + name, + username, + password, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + minLargeMessageSize, + confirmationWindowSize); + } + catch (ActiveMQException e) + { + if (e.getType() != ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) + { + throw e; + } + } + } + connection.destroy(); + throw new ActiveMQException(ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS); + } + + public SessionContext createSessionContext(Version clientVersion, String name, String username, String password, + boolean xa, boolean autoCommitSends, boolean autoCommitAcks, + boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException + { + if (!isAlive()) + throw ActiveMQClientMessageBundle.BUNDLE.clientSessionClosed(); + + Channel sessionChannel = null; + CreateSessionResponseMessage response = null; + + boolean retry; + do + { + retry = false; + + Lock lock = null; + + try + { + + lock = lockSessionCreation(); + + // We now set a flag saying createSession is executing + synchronized (inCreateSessionGuard) + { + if (!isAlive()) + throw ActiveMQClientMessageBundle.BUNDLE.clientSessionClosed(); + inCreateSession = true; + inCreateSessionLatch = new CountDownLatch(1); + } + + long sessionChannelID = connection.generateChannelID(); + + Packet request = new CreateSessionMessage(name, + sessionChannelID, + clientVersion.getIncrementingVersion(), + username, + password, + minLargeMessageSize, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + confirmationWindowSize, + null); + + + try + { + // channel1 reference here has to go away + response = (CreateSessionResponseMessage) getChannel1().sendBlocking(request, PacketImpl.CREATESESSION_RESP); + } + catch (ActiveMQException cause) + { + if (!isAlive()) + throw cause; + + if (cause.getType() == ActiveMQExceptionType.UNBLOCKED) + { + // This means the thread was blocked on create session and failover unblocked it + // so failover could occur + + retry = true; + + continue; + } + else + { + throw cause; + } + } + + sessionChannel = connection.getChannel(sessionChannelID, confirmationWindowSize); + + + } + catch (Throwable t) + { + if (lock != null) + { + lock.unlock(); + lock = null; + } + + if (t instanceof ActiveMQException) + { + throw (ActiveMQException) t; + } + else + { + throw ActiveMQClientMessageBundle.BUNDLE.failedToCreateSession(t); + } + } + finally + { + if (lock != null) + { + lock.unlock(); + } + + // Execution has finished so notify any failover thread that may be waiting for us to be done + inCreateSession = false; + inCreateSessionLatch.countDown(); + } + } + while (retry); + + + // these objects won't be null, otherwise it would keep retrying on the previous loop + return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); + + } + + public boolean cleanupBeforeFailover(ActiveMQException cause) + { + + boolean needToInterrupt; + + CountDownLatch exitLockLatch; + Lock lock = lockSessionCreation(); + + if (lock == null) + { + return false; + } + + try + { + synchronized (inCreateSessionGuard) + { + needToInterrupt = inCreateSession; + exitLockLatch = inCreateSessionLatch; + } + } + finally + { + lock.unlock(); + } + + if (needToInterrupt) + { + forceReturnChannel1(cause); + + // Now we need to make sure that the thread has actually exited and returned it's + // connections + // before failover occurs + + while (inCreateSession && isAlive()) + { + try + { + if (exitLockLatch != null) + { + exitLockLatch.await(500, TimeUnit.MILLISECONDS); + } + } + catch (InterruptedException e1) + { + throw new ActiveMQInterruptedException(e1); + } + } + } + + return true; + } + + @Override + public boolean checkForFailover(String liveNodeID) throws ActiveMQException + { + CheckFailoverMessage packet = new CheckFailoverMessage(liveNodeID); + CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet, + PacketImpl.CHECK_FOR_FAILOVER_REPLY); + return message.isOkToFailover(); + } + + + public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, + List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, + TopologyResponseHandler topologyResponseHandler) + { + this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, + callTimeout, callFailoverTimeout, + incomingInterceptors, outgoingInterceptors); + + this.topologyResponseHandler = topologyResponseHandler; + + getChannel0().setHandler(new Channel0Handler(connection)); + + + sendHandshake(transportConnection); + + return connection; + } + + private void sendHandshake(Connection transportConnection) + { + if (transportConnection.isUsingProtocolHandling()) + { + // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling + String handshake = "ACTIVEMQ"; + ActiveMQBuffer amqbuffer = connection.createBuffer(handshake.length()); + amqbuffer.writeBytes(handshake.getBytes()); + transportConnection.write(amqbuffer); + } + } + + + private class Channel0Handler implements ChannelHandler + { + private final CoreRemotingConnection conn; + + private Channel0Handler(final CoreRemotingConnection conn) + { + this.conn = conn; + } + + public void handlePacket(final Packet packet) + { + final byte type = packet.getType(); + + if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2) + { + final DisconnectMessage msg = (DisconnectMessage) packet; + String scaleDownTargetNodeID = null; + + SimpleString nodeID = msg.getNodeID(); + + if (packet instanceof DisconnectMessage_V2) + { + final DisconnectMessage_V2 msg_v2 = (DisconnectMessage_V2) packet; + scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString(); + } + + if (topologyResponseHandler != null) + topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); + } + else if (type == PacketImpl.CLUSTER_TOPOLOGY) + { + ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; + notifyTopologyChange(topMessage); + } + else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) + { + ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet; + notifyTopologyChange(topMessage); + } + else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) + { + ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; + notifyTopologyChange(topMessage); + } + else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) + { + System.out.println("Channel0Handler.handlePacket"); + } + } + + /** + * @param topMessage + */ + private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) + { + final long eventUID; + final String backupGroupName; + final String scaleDownGroupName; + if (topMessage instanceof ClusterTopologyChangeMessage_V3) + { + eventUID = ((ClusterTopologyChangeMessage_V3) topMessage).getUniqueEventID(); + backupGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getBackupGroupName(); + scaleDownGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getScaleDownGroupName(); + } + else if (topMessage instanceof ClusterTopologyChangeMessage_V2) + { + eventUID = ((ClusterTopologyChangeMessage_V2) topMessage).getUniqueEventID(); + backupGroupName = ((ClusterTopologyChangeMessage_V2) topMessage).getBackupGroupName(); + scaleDownGroupName = null; + } + else + { + eventUID = System.currentTimeMillis(); + backupGroupName = null; + scaleDownGroupName = null; + } + + if (topMessage.isExit()) + { + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) + { + ActiveMQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down"); + } + + if (topologyResponseHandler != null) + { + topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID()); + } + } + else + { + Pair<TransportConfiguration, TransportConfiguration> transportConfig = topMessage.getPair(); + if (transportConfig.getA() == null && transportConfig.getB() == null) + { + transportConfig = new Pair<>(conn.getTransportConnection() + .getConnectorConfig(), + null); + } + + if (topologyResponseHandler != null) + { + topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast()); + } + } + } + } + + protected PacketDecoder getPacketDecoder() + { + return ClientPacketDecoder.INSTANCE; + } + + private void forceReturnChannel1(ActiveMQException cause) + { + if (connection != null) + { + Channel channel1 = connection.getChannel(1, -1); + + if (channel1 != null) + { + channel1.returnBlocking(cause); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java new file mode 100644 index 0000000..857fedc --- /dev/null +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java @@ -0,0 +1,42 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.core.protocol.core.impl; + +import org.apache.activemq.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq.spi.core.remoting.ClientProtocolManagerFactory; + +/** + * @author Clebert Suconic + */ + +public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManagerFactory +{ + private static final long serialVersionUID = 1; + + private static final ActiveMQClientProtocolManagerFactory INSTANCE = new ActiveMQClientProtocolManagerFactory(); + + private ActiveMQClientProtocolManagerFactory() + { + } + + public static final ActiveMQClientProtocolManagerFactory getInstance() + { + return INSTANCE; + } + + public ClientProtocolManager newProtocolManager() + { + return new ActiveMQClientProtocolManager(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQConsumerContext.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQConsumerContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQConsumerContext.java new file mode 100644 index 0000000..0747f48 --- /dev/null +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQConsumerContext.java @@ -0,0 +1,54 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.core.protocol.core.impl; + +import org.apache.activemq.spi.core.remoting.ConsumerContext; + +/** + * @author Clebert Suconic + */ + +public class ActiveMQConsumerContext extends ConsumerContext +{ + private long id; + + public ActiveMQConsumerContext(long id) + { + this.id = id; + } + + public long getId() + { + return id; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ActiveMQConsumerContext that = (ActiveMQConsumerContext) o; + + if (id != that.id) return false; + + return true; + } + + @Override + public int hashCode() + { + return (int) (id ^ (id >>> 32)); + } +}
