http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ServerLocatorImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ServerLocatorImpl.java new file mode 100644 index 0000000..c37036a --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ServerLocatorImpl.java @@ -0,0 +1,2126 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq6.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.HornetQIllegalStateException; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.Interceptor; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.ClusterTopologyListener; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.api.core.client.TopologyMember; +import org.apache.activemq6.api.core.client.loadbalance.ConnectionLoadBalancingPolicy; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.cluster.DiscoveryEntry; +import org.apache.activemq6.core.cluster.DiscoveryGroup; +import org.apache.activemq6.core.cluster.DiscoveryListener; +import org.apache.activemq6.core.protocol.core.impl.HornetQClientProtocolManagerFactory; +import org.apache.activemq6.core.remoting.FailureListener; +import org.apache.activemq6.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq6.spi.core.remoting.ClientProtocolManagerFactory; +import org.apache.activemq6.spi.core.remoting.Connector; +import org.apache.activemq6.utils.ClassloadingUtil; +import org.apache.activemq6.utils.HornetQThreadFactory; +import org.apache.activemq6.utils.UUIDGenerator; + +/** + * This is the implementation of {@link org.apache.activemq6.api.core.client.ServerLocator} and all + * the proper javadoc is located on that interface. + * + * @author Tim Fox + */ +public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable +{ + /*needed for backward compatibility*/ + @SuppressWarnings("unused") + private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>(); + + /*end of compatibility fixes*/ + private enum STATE + { + INITIALIZED, CLOSED, CLOSING + } + + private static final long serialVersionUID = -1615857864410205260L; + + + // This is the default value + private ClientProtocolManagerFactory protocolManagerFactory = HornetQClientProtocolManagerFactory.getInstance(); + + private final boolean ha; + + private boolean finalizeCheck = true; + + private boolean clusterConnection; + + private transient String identity; + + private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>(); + + private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>(); + + private volatile TransportConfiguration[] initialConnectors; + + private final DiscoveryGroupConfiguration discoveryGroupConfiguration; + + private final StaticConnector staticConnector = new StaticConnector(); + + private final Topology topology; + + //needs to be serializable and not final for retrocompatibility + private String topologyArrayGuard = new String(); + + private volatile Pair<TransportConfiguration, TransportConfiguration>[] topologyArray; + + private volatile boolean receivedTopology; + + private boolean compressLargeMessage; + + // if the system should shutdown the pool when shutting down + private transient boolean shutdownPool; + + private transient ExecutorService threadPool; + + private transient ScheduledExecutorService scheduledThreadPool; + + private transient DiscoveryGroup discoveryGroup; + + private transient ConnectionLoadBalancingPolicy loadBalancingPolicy; + + // Settable attributes: + + private boolean cacheLargeMessagesClient; + + private long clientFailureCheckPeriod; + + private long connectionTTL; + + private long callTimeout; + + private long callFailoverTimeout; + + private int minLargeMessageSize; + + private int consumerWindowSize; + + private int consumerMaxRate; + + private int confirmationWindowSize; + + private int producerWindowSize; + + private int producerMaxRate; + + private boolean blockOnAcknowledge; + + private boolean blockOnDurableSend; + + private boolean blockOnNonDurableSend; + + private boolean autoGroup; + + private boolean preAcknowledge; + + private String connectionLoadBalancingPolicyClassName; + + private int ackBatchSize; + + private boolean useGlobalPools; + + private int scheduledThreadPoolMaxSize; + + private int threadPoolMaxSize; + + private long retryInterval; + + private double retryIntervalMultiplier; + + private long maxRetryInterval; + + private int reconnectAttempts; + + private int initialConnectAttempts; + + private boolean failoverOnInitialConnection; + + private int initialMessagePacketSize; + + //needs to be serializable and not final for retrocompatibility + private String stateGuard = new String(); + private transient STATE state; + private transient CountDownLatch latch; + + private final List<Interceptor> incomingInterceptors = new CopyOnWriteArrayList<Interceptor>(); + + private final List<Interceptor> outgoingInterceptors = new CopyOnWriteArrayList<Interceptor>(); + + private static ExecutorService globalThreadPool; + + private Executor startExecutor; + + private static ScheduledExecutorService globalScheduledThreadPool; + + private AfterConnectInternalListener afterConnectListener; + + private String groupID; + + private String nodeID; + + private TransportConfiguration clusterTransportConfiguration; + + /* + * *************WARNING*************** + * remember that when adding any new classes that we have to support serialization with previous clients. + * If you need to, make them transient and handle the serialization yourself + * */ + + private final Exception traceException = new Exception(); + + // To be called when there are ServerLocator being finalized. + // To be used on test assertions + public static Runnable finalizeCallback = null; + + public static synchronized void clearThreadPools() + { + + if (globalThreadPool != null) + { + globalThreadPool.shutdown(); + try + { + if (!globalThreadPool.awaitTermination(10, TimeUnit.SECONDS)) + { + throw new IllegalStateException("Couldn't finish the globalThreadPool"); + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + finally + { + globalThreadPool = null; + } + } + + if (globalScheduledThreadPool != null) + { + globalScheduledThreadPool.shutdown(); + try + { + if (!globalScheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS)) + { + throw new IllegalStateException("Couldn't finish the globalScheduledThreadPool"); + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + finally + { + globalScheduledThreadPool = null; + } + } + } + + private static synchronized ExecutorService getGlobalThreadPool() + { + if (globalThreadPool == null) + { + ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader()); + + globalThreadPool = Executors.newCachedThreadPool(factory); + } + + return globalThreadPool; + } + + private static synchronized ScheduledExecutorService getGlobalScheduledThreadPool() + { + if (globalScheduledThreadPool == null) + { + ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads", + true, + getThisClassLoader()); + + globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, + + factory); + } + + return globalScheduledThreadPool; + } + + private synchronized void setThreadPools() + { + if (threadPool != null) + { + return; + } + else if (useGlobalPools) + { + threadPool = getGlobalThreadPool(); + + scheduledThreadPool = getGlobalScheduledThreadPool(); + } + else + { + this.shutdownPool = true; + + ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this), + true, + getThisClassLoader()); + + if (threadPoolMaxSize == -1) + { + threadPool = Executors.newCachedThreadPool(factory); + } + else + { + threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory); + } + + factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this), + true, + getThisClassLoader()); + + scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory); + } + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return ClientSessionFactoryImpl.class.getClassLoader(); + } + }); + + } + + + private void instantiateLoadBalancingPolicy() + { + if (connectionLoadBalancingPolicyClassName == null) + { + throw new IllegalStateException("Please specify a load balancing policy class name on the session factory"); + } + AccessController.doPrivileged(new PrivilegedAction<Object>() + { + public Object run() + { + loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(connectionLoadBalancingPolicyClassName); + return null; + } + }); + } + + private synchronized void initialise() throws HornetQException + { + if (state == STATE.INITIALIZED) + return; + synchronized (stateGuard) + { + if (state == STATE.CLOSING) + throw new HornetQIllegalStateException(); + try + { + state = STATE.INITIALIZED; + latch = new CountDownLatch(1); + + setThreadPools(); + + instantiateLoadBalancingPolicy(); + + if (discoveryGroupConfiguration != null) + { + discoveryGroup = createDiscoveryGroup(nodeID, discoveryGroupConfiguration); + + discoveryGroup.registerListener(this); + + discoveryGroup.start(); + } + } + catch (Exception e) + { + state = null; + throw HornetQClientMessageBundle.BUNDLE.failedToInitialiseSessionFactory(e); + } + } + } + + private static DiscoveryGroup createDiscoveryGroup(String nodeID, DiscoveryGroupConfiguration config) throws Exception + { + DiscoveryGroup group = new DiscoveryGroup(nodeID, config.getName(), + config.getRefreshTimeout(), config.getBroadcastEndpointFactoryConfiguration().createBroadcastEndpointFactory(), null); + return group; + } + + private ServerLocatorImpl(final Topology topology, + final boolean useHA, + final DiscoveryGroupConfiguration discoveryGroupConfiguration, + final TransportConfiguration[] transportConfigs) + { + traceException.fillInStackTrace(); + + this.topology = topology == null ? new Topology(this) : topology; + + this.ha = useHA; + + this.discoveryGroupConfiguration = discoveryGroupConfiguration; + + this.initialConnectors = transportConfigs != null ? transportConfigs : null; + + this.nodeID = UUIDGenerator.getInstance().generateStringUUID(); + + clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD; + + connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL; + + callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT; + + callFailoverTimeout = HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT; + + minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; + + consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE; + + consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE; + + confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; + + producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE; + + producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE; + + blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE; + + blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND; + + blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND; + + autoGroup = HornetQClient.DEFAULT_AUTO_GROUP; + + preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE; + + ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE; + + connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME; + + useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS; + + scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE; + + threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE; + + retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL; + + retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER; + + maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL; + + reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS; + + initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS; + + failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION; + + cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; + + initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE; + + cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT; + + compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; + + clusterConnection = false; + } + + /** + * Create a ServerLocatorImpl using UDP discovery to lookup cluster + */ + public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration) + { + this(new Topology(null), useHA, groupConfiguration, null); + if (useHA) + { + // We only set the owner at where the Topology was created. + // For that reason we can't set it at the main constructor + topology.setOwner(this); + } + } + + /** + * Create a ServerLocatorImpl using a static list of live servers + * + * @param transportConfigs + */ + public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs) + { + this(new Topology(null), useHA, null, transportConfigs); + if (useHA) + { + // We only set the owner at where the Topology was created. + // For that reason we can't set it at the main constructor + topology.setOwner(this); + } + } + + /** + * Create a ServerLocatorImpl using UDP discovery to lookup cluster + */ + public ServerLocatorImpl(final Topology topology, + final boolean useHA, + final DiscoveryGroupConfiguration groupConfiguration) + { + this(topology, useHA, groupConfiguration, null); + } + + /** + * Create a ServerLocatorImpl using a static list of live servers + * + * @param transportConfigs + */ + public ServerLocatorImpl(final Topology topology, + final boolean useHA, + final TransportConfiguration... transportConfigs) + { + this(topology, useHA, null, transportConfigs); + } + + public void resetToInitialConnectors() + { + synchronized (topologyArrayGuard) + { + receivedTopology = false; + topologyArray = null; + topology.clear(); + } + } + + private ServerLocatorImpl(ServerLocatorImpl locator) + { + ha = locator.ha; + finalizeCheck = locator.finalizeCheck; + clusterConnection = locator.clusterConnection; + initialConnectors = locator.initialConnectors; + discoveryGroupConfiguration = locator.discoveryGroupConfiguration; + topology = locator.topology; + topologyArray = locator.topologyArray; + receivedTopology = locator.receivedTopology; + compressLargeMessage = locator.compressLargeMessage; + cacheLargeMessagesClient = locator.cacheLargeMessagesClient; + clientFailureCheckPeriod = locator.clientFailureCheckPeriod; + connectionTTL = locator.connectionTTL; + callTimeout = locator.callTimeout; + callFailoverTimeout = locator.callFailoverTimeout; + minLargeMessageSize = locator.minLargeMessageSize; + consumerWindowSize = locator.consumerWindowSize; + consumerMaxRate = locator.consumerMaxRate; + confirmationWindowSize = locator.confirmationWindowSize; + producerWindowSize = locator.producerWindowSize; + producerMaxRate = locator.producerMaxRate; + blockOnAcknowledge = locator.blockOnAcknowledge; + blockOnDurableSend = locator.blockOnDurableSend; + blockOnNonDurableSend = locator.blockOnNonDurableSend; + autoGroup = locator.autoGroup; + preAcknowledge = locator.preAcknowledge; + connectionLoadBalancingPolicyClassName = locator.connectionLoadBalancingPolicyClassName; + ackBatchSize = locator.ackBatchSize; + useGlobalPools = locator.useGlobalPools; + scheduledThreadPoolMaxSize = locator.scheduledThreadPoolMaxSize; + threadPoolMaxSize = locator.threadPoolMaxSize; + retryInterval = locator.retryInterval; + retryIntervalMultiplier = locator.retryIntervalMultiplier; + maxRetryInterval = locator.maxRetryInterval; + reconnectAttempts = locator.reconnectAttempts; + initialConnectAttempts = locator.initialConnectAttempts; + failoverOnInitialConnection = locator.failoverOnInitialConnection; + initialMessagePacketSize = locator.initialMessagePacketSize; + startExecutor = locator.startExecutor; + afterConnectListener = locator.afterConnectListener; + groupID = locator.groupID; + nodeID = locator.nodeID; + clusterTransportConfiguration = locator.clusterTransportConfiguration; + } + + private synchronized TransportConfiguration selectConnector() + { + Pair<TransportConfiguration, TransportConfiguration>[] usedTopology; + + synchronized (topologyArrayGuard) + { + usedTopology = topologyArray; + } + + // if the topologyArray is null, we will use the initialConnectors + if (usedTopology != null) + { + int pos = loadBalancingPolicy.select(usedTopology.length); + Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos]; + + return pair.getA(); + } + else + { + // Get from initialconnectors + + int pos = loadBalancingPolicy.select(initialConnectors.length); + + return initialConnectors[pos]; + } + } + + public void start(Executor executor) throws Exception + { + initialise(); + + this.startExecutor = executor; + + if (executor != null) + { + executor.execute(new Runnable() + { + public void run() + { + try + { + connect(); + } + catch (Exception e) + { + if (!isClosed()) + { + HornetQClientLogger.LOGGER.errorConnectingToNodes(e); + } + } + } + }); + } + } + + + public ClientProtocolManager newProtocolManager() + { + return getProtocolManagerFactory().newProtocolManager(); + } + + public ClientProtocolManagerFactory getProtocolManagerFactory() + { + if (protocolManagerFactory == null) + { + // this could happen over serialization from older versions + protocolManagerFactory = HornetQClientProtocolManagerFactory.getInstance(); + } + return protocolManagerFactory; + } + + public void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) + { + this.protocolManagerFactory = protocolManagerFactory; + } + + + public void disableFinalizeCheck() + { + finalizeCheck = false; + } + + @Override + public ClientSessionFactoryInternal connect() throws HornetQException + { + return connect(false); + } + + private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws HornetQException + { + synchronized (this) + { + // static list of initial connectors + if (getNumInitialConnectors() > 0 && discoveryGroup == null) + { + ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings); + addFactory(sf); + return sf; + } + } + // wait for discovery group to get the list of initial connectors + return (ClientSessionFactoryInternal) createSessionFactory(); + } + + @Override + public ClientSessionFactoryInternal connectNoWarnings() throws HornetQException + { + return connect(true); + } + + public ServerLocatorImpl setAfterConnectionInternalListener(AfterConnectInternalListener listener) + { + this.afterConnectListener = listener; + return this; + } + + public AfterConnectInternalListener getAfterConnectInternalListener() + { + return afterConnectListener; + } + + public ClientSessionFactory createSessionFactory(String nodeID) throws Exception + { + TopologyMember topologyMember = topology.getMember(nodeID); + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("Creating connection factory towards " + nodeID + " = " + topologyMember + ", topology=" + topology.describe()); + } + + if (topologyMember == null) + { + return null; + } + if (topologyMember.getLive() != null) + { + ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) createSessionFactory(topologyMember.getLive()); + if (topologyMember.getBackup() != null) + { + factory.setBackupConnector(topologyMember.getLive(), topologyMember.getBackup()); + } + return factory; + } + if (topologyMember.getLive() == null && topologyMember.getBackup() != null) + { + // This shouldn't happen, however I wanted this to consider all possible cases + ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) createSessionFactory(topologyMember.getBackup()); + return factory; + } + // it shouldn't happen + return null; + } + + public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception + { + assertOpen(); + + initialise(); + + ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, + transportConfiguration, + callTimeout, + callFailoverTimeout, + clientFailureCheckPeriod, + connectionTTL, + retryInterval, + retryIntervalMultiplier, + maxRetryInterval, + reconnectAttempts, + threadPool, + scheduledThreadPool, + incomingInterceptors, + outgoingInterceptors); + + addToConnecting(factory); + try + { + try + { + factory.connect(reconnectAttempts, failoverOnInitialConnection); + } + catch (HornetQException e1) + { + //we need to make sure is closed just for garbage collection + factory.close(); + throw e1; + } + addFactory(factory); + return factory; + } + finally + { + removeFromConnecting(factory); + } + } + + public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration, int reconnectAttempts, boolean failoverOnInitialConnection) throws Exception + { + assertOpen(); + + initialise(); + + ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, + transportConfiguration, + callTimeout, + callFailoverTimeout, + clientFailureCheckPeriod, + connectionTTL, + retryInterval, + retryIntervalMultiplier, + maxRetryInterval, + reconnectAttempts, + threadPool, + scheduledThreadPool, + incomingInterceptors, + outgoingInterceptors); + + addToConnecting(factory); + try + { + try + { + factory.connect(reconnectAttempts, failoverOnInitialConnection); + } + catch (HornetQException e1) + { + //we need to make sure is closed just for garbage collection + factory.close(); + throw e1; + } + addFactory(factory); + return factory; + } + finally + { + removeFromConnecting(factory); + } + } + + private void removeFromConnecting(ClientSessionFactoryInternal factory) + { + synchronized (connectingFactories) + { + connectingFactories.remove(factory); + } + } + + private void addToConnecting(ClientSessionFactoryInternal factory) + { + synchronized (connectingFactories) + { + assertOpen(); + connectingFactories.add(factory); + } + } + + public ClientSessionFactory createSessionFactory() throws HornetQException + { + assertOpen(); + + initialise(); + + if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) + { + // Wait for an initial broadcast to give us at least one node in the cluster + long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout(); + boolean ok = discoveryGroup.waitForBroadcast(timeout); + + if (!ok) + { + throw HornetQClientMessageBundle.BUNDLE.connectionTimedOutInInitialBroadcast(); + } + } + + ClientSessionFactoryInternal factory = null; + + synchronized (this) + { + boolean retry; + int attempts = 0; + do + { + retry = false; + + TransportConfiguration tc = selectConnector(); + if (tc == null) + { + throw HornetQClientMessageBundle.BUNDLE.noTCForSessionFactory(); + } + + // try each factory in the list until we find one which works + + try + { + factory = new ClientSessionFactoryImpl(this, + tc, + callTimeout, + callFailoverTimeout, + clientFailureCheckPeriod, + connectionTTL, + retryInterval, + retryIntervalMultiplier, + maxRetryInterval, + reconnectAttempts, + threadPool, + scheduledThreadPool, + incomingInterceptors, + outgoingInterceptors); + try + { + addToConnecting(factory); + factory.connect(initialConnectAttempts, failoverOnInitialConnection); + } + finally + { + removeFromConnecting(factory); + } + } + catch (HornetQException e) + { + factory.close(); + factory = null; + if (e.getType() == HornetQExceptionType.NOT_CONNECTED) + { + attempts++; + + synchronized (topologyArrayGuard) + { + + if (topologyArray != null && attempts == topologyArray.length) + { + throw HornetQClientMessageBundle.BUNDLE.cannotConnectToServers(); + } + if (topologyArray == null && attempts == this.getNumInitialConnectors()) + { + throw HornetQClientMessageBundle.BUNDLE.cannotConnectToServers(); + } + } + retry = true; + } + else + { + throw e; + } + } + } + while (retry); + + // We always wait for the topology, as the server + // will send a single element if not cluster + // so clients can know the id of the server they are connected to + final long timeout = System.currentTimeMillis() + callTimeout; + while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis()) + { + // Now wait for the topology + try + { + wait(1000); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + + + // We are waiting for the topology here, + // however to avoid a race where the connection is closed (and receivedtopology set to true) + // between the wait and this timeout here, we redo the check for timeout. + // if this becomes false there's no big deal and we will just ignore the issue + // notice that we can't add more locks here otherwise there wouldn't be able to avoid a deadlock + final boolean hasTimedOut = timeout > System.currentTimeMillis(); + if (!hasTimedOut && !receivedTopology) + { + if (factory != null) + factory.cleanup(); + throw HornetQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); + } + + addFactory(factory); + + return factory; + } + + } + + public boolean isHA() + { + return ha; + } + + public boolean isCacheLargeMessagesClient() + { + return cacheLargeMessagesClient; + } + + public ServerLocatorImpl setCacheLargeMessagesClient(final boolean cached) + { + cacheLargeMessagesClient = cached; + return this; + } + + public long getClientFailureCheckPeriod() + { + return clientFailureCheckPeriod; + } + + public ServerLocatorImpl setClientFailureCheckPeriod(final long clientFailureCheckPeriod) + { + checkWrite(); + this.clientFailureCheckPeriod = clientFailureCheckPeriod; + return this; + } + + public long getConnectionTTL() + { + return connectionTTL; + } + + public ServerLocatorImpl setConnectionTTL(final long connectionTTL) + { + checkWrite(); + this.connectionTTL = connectionTTL; + return this; + } + + public long getCallTimeout() + { + return callTimeout; + } + + public ServerLocatorImpl setCallTimeout(final long callTimeout) + { + checkWrite(); + this.callTimeout = callTimeout; + return this; + } + + public long getCallFailoverTimeout() + { + return callFailoverTimeout; + } + + public ServerLocatorImpl setCallFailoverTimeout(long callFailoverTimeout) + { + checkWrite(); + this.callFailoverTimeout = callFailoverTimeout; + return this; + } + + public int getMinLargeMessageSize() + { + return minLargeMessageSize; + } + + public ServerLocatorImpl setMinLargeMessageSize(final int minLargeMessageSize) + { + checkWrite(); + this.minLargeMessageSize = minLargeMessageSize; + return this; + } + + public int getConsumerWindowSize() + { + return consumerWindowSize; + } + + public ServerLocatorImpl setConsumerWindowSize(final int consumerWindowSize) + { + checkWrite(); + this.consumerWindowSize = consumerWindowSize; + return this; + } + + public int getConsumerMaxRate() + { + return consumerMaxRate; + } + + public ServerLocatorImpl setConsumerMaxRate(final int consumerMaxRate) + { + checkWrite(); + this.consumerMaxRate = consumerMaxRate; + return this; + } + + public int getConfirmationWindowSize() + { + return confirmationWindowSize; + } + + public ServerLocatorImpl setConfirmationWindowSize(final int confirmationWindowSize) + { + checkWrite(); + this.confirmationWindowSize = confirmationWindowSize; + return this; + } + + public int getProducerWindowSize() + { + return producerWindowSize; + } + + public ServerLocatorImpl setProducerWindowSize(final int producerWindowSize) + { + checkWrite(); + this.producerWindowSize = producerWindowSize; + return this; + } + + public int getProducerMaxRate() + { + return producerMaxRate; + } + + public ServerLocatorImpl setProducerMaxRate(final int producerMaxRate) + { + checkWrite(); + this.producerMaxRate = producerMaxRate; + return this; + } + + public boolean isBlockOnAcknowledge() + { + return blockOnAcknowledge; + } + + public ServerLocatorImpl setBlockOnAcknowledge(final boolean blockOnAcknowledge) + { + checkWrite(); + this.blockOnAcknowledge = blockOnAcknowledge; + return this; + } + + public boolean isBlockOnDurableSend() + { + return blockOnDurableSend; + } + + public ServerLocatorImpl setBlockOnDurableSend(final boolean blockOnDurableSend) + { + checkWrite(); + this.blockOnDurableSend = blockOnDurableSend; + return this; + } + + public boolean isBlockOnNonDurableSend() + { + return blockOnNonDurableSend; + } + + public ServerLocatorImpl setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) + { + checkWrite(); + this.blockOnNonDurableSend = blockOnNonDurableSend; + return this; + } + + public boolean isAutoGroup() + { + return autoGroup; + } + + public ServerLocatorImpl setAutoGroup(final boolean autoGroup) + { + checkWrite(); + this.autoGroup = autoGroup; + return this; + } + + public boolean isPreAcknowledge() + { + return preAcknowledge; + } + + public ServerLocatorImpl setPreAcknowledge(final boolean preAcknowledge) + { + checkWrite(); + this.preAcknowledge = preAcknowledge; + return this; + } + + public int getAckBatchSize() + { + return ackBatchSize; + } + + public ServerLocatorImpl setAckBatchSize(final int ackBatchSize) + { + checkWrite(); + this.ackBatchSize = ackBatchSize; + return this; + } + + public boolean isUseGlobalPools() + { + return useGlobalPools; + } + + public ServerLocatorImpl setUseGlobalPools(final boolean useGlobalPools) + { + checkWrite(); + this.useGlobalPools = useGlobalPools; + return this; + } + + public int getScheduledThreadPoolMaxSize() + { + return scheduledThreadPoolMaxSize; + } + + public ServerLocatorImpl setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) + { + checkWrite(); + this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize; + return this; + } + + public int getThreadPoolMaxSize() + { + return threadPoolMaxSize; + } + + public ServerLocatorImpl setThreadPoolMaxSize(final int threadPoolMaxSize) + { + checkWrite(); + this.threadPoolMaxSize = threadPoolMaxSize; + return this; + } + + public long getRetryInterval() + { + return retryInterval; + } + + public ServerLocatorImpl setRetryInterval(final long retryInterval) + { + checkWrite(); + this.retryInterval = retryInterval; + return this; + } + + public long getMaxRetryInterval() + { + return maxRetryInterval; + } + + public ServerLocatorImpl setMaxRetryInterval(final long retryInterval) + { + checkWrite(); + maxRetryInterval = retryInterval; + return this; + } + + public double getRetryIntervalMultiplier() + { + return retryIntervalMultiplier; + } + + public ServerLocatorImpl setRetryIntervalMultiplier(final double retryIntervalMultiplier) + { + checkWrite(); + this.retryIntervalMultiplier = retryIntervalMultiplier; + return this; + } + + public int getReconnectAttempts() + { + return reconnectAttempts; + } + + public ServerLocatorImpl setReconnectAttempts(final int reconnectAttempts) + { + checkWrite(); + this.reconnectAttempts = reconnectAttempts; + return this; + } + + public ServerLocatorImpl setInitialConnectAttempts(int initialConnectAttempts) + { + checkWrite(); + this.initialConnectAttempts = initialConnectAttempts; + return this; + } + + public int getInitialConnectAttempts() + { + return initialConnectAttempts; + } + + public boolean isFailoverOnInitialConnection() + { + return this.failoverOnInitialConnection; + } + + public ServerLocatorImpl setFailoverOnInitialConnection(final boolean failover) + { + checkWrite(); + this.failoverOnInitialConnection = failover; + return this; + } + + public String getConnectionLoadBalancingPolicyClassName() + { + return connectionLoadBalancingPolicyClassName; + } + + public ServerLocatorImpl setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) + { + checkWrite(); + connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName; + return this; + } + + public TransportConfiguration[] getStaticTransportConfigurations() + { + if (initialConnectors == null) return new TransportConfiguration[]{}; + return Arrays.copyOf(initialConnectors, initialConnectors.length); + } + + public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() + { + return discoveryGroupConfiguration; + } + + @Override + @Deprecated + public void addInterceptor(final Interceptor interceptor) + { + addIncomingInterceptor(interceptor); + } + + public ServerLocatorImpl addIncomingInterceptor(final Interceptor interceptor) + { + incomingInterceptors.add(interceptor); + return this; + } + + public ServerLocatorImpl addOutgoingInterceptor(final Interceptor interceptor) + { + outgoingInterceptors.add(interceptor); + return this; + } + + @Override + @Deprecated + public boolean removeInterceptor(final Interceptor interceptor) + { + return removeIncomingInterceptor(interceptor); + } + + public boolean removeIncomingInterceptor(final Interceptor interceptor) + { + return incomingInterceptors.remove(interceptor); + } + + public boolean removeOutgoingInterceptor(final Interceptor interceptor) + { + return outgoingInterceptors.remove(interceptor); + } + + public int getInitialMessagePacketSize() + { + return initialMessagePacketSize; + } + + public ServerLocatorImpl setInitialMessagePacketSize(final int size) + { + checkWrite(); + initialMessagePacketSize = size; + return this; + } + + public ServerLocatorImpl setGroupID(final String groupID) + { + checkWrite(); + this.groupID = groupID; + return this; + } + + public String getGroupID() + { + return groupID; + } + + public boolean isCompressLargeMessage() + { + return compressLargeMessage; + } + + public ServerLocatorImpl setCompressLargeMessage(boolean avoid) + { + this.compressLargeMessage = avoid; + return this; + } + + private void checkWrite() + { + synchronized (stateGuard) + { + if (state != null && state != STATE.CLOSED) + { + throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used"); + } + } + } + + private int getNumInitialConnectors() + { + if (initialConnectors == null) return 0; + return initialConnectors.length; + } + + public ServerLocatorImpl setIdentity(String identity) + { + this.identity = identity; + return this; + } + + public ServerLocatorImpl setNodeID(String nodeID) + { + this.nodeID = nodeID; + return this; + } + + public String getNodeID() + { + return nodeID; + } + + public ServerLocatorImpl setClusterConnection(boolean clusterConnection) + { + this.clusterConnection = clusterConnection; + return this; + } + + public boolean isClusterConnection() + { + return clusterConnection; + } + + public TransportConfiguration getClusterTransportConfiguration() + { + return clusterTransportConfiguration; + } + + public ServerLocatorImpl setClusterTransportConfiguration(TransportConfiguration tc) + { + this.clusterTransportConfiguration = tc; + return this; + } + + @Override + protected void finalize() throws Throwable + { + if (finalizeCheck) + { + close(); + } + + super.finalize(); + } + + public void cleanup() + { + doClose(false); + } + + public void close() + { + doClose(true); + } + + private void doClose(final boolean sendClose) + { + synchronized (stateGuard) + { + if (state == STATE.CLOSED) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug(this + " is already closed when calling closed"); + } + return; + } + + state = STATE.CLOSING; + } + if (latch != null) + latch.countDown(); + + synchronized (connectingFactories) + { + for (ClientSessionFactoryInternal csf : connectingFactories) + { + csf.causeExit(); + } + } + + if (discoveryGroup != null) + { + synchronized (this) + { + try + { + discoveryGroup.stop(); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.failedToStopDiscovery(e); + } + } + } + else + { + staticConnector.disconnect(); + } + + synchronized (connectingFactories) + { + for (ClientSessionFactoryInternal csf : connectingFactories) + { + csf.causeExit(); + } + for (ClientSessionFactoryInternal csf : connectingFactories) + { + csf.close(); + } + connectingFactories.clear(); + } + + Set<ClientSessionFactoryInternal> clonedFactory; + synchronized (factories) + { + clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories); + + factories.clear(); + } + + for (ClientSessionFactoryInternal factory : clonedFactory) + { + factory.causeExit(); + } + for (ClientSessionFactory factory : clonedFactory) + { + if (sendClose) + { + factory.close(); + } + else + { + factory.cleanup(); + } + } + + if (shutdownPool) + { + if (threadPool != null) + { + threadPool.shutdown(); + + try + { + if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) + { + HornetQClientLogger.LOGGER.timedOutWaitingForTermination(); + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + + if (scheduledThreadPool != null) + { + scheduledThreadPool.shutdown(); + + try + { + if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) + { + HornetQClientLogger.LOGGER.timedOutWaitingForScheduledPoolTermination(); + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + } + synchronized (stateGuard) + { + state = STATE.CLOSED; + } + } + + /** + * This is directly called when the connection to the node is gone, + * or when the node sends a disconnection. + * Look for callers of this method! + */ + @Override + public void notifyNodeDown(final long eventTime, final String nodeID) + { + + if (!ha) + { + // there's no topology here + return; + } + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace")); + } + + topology.removeMember(eventTime, nodeID); + + if (clusterConnection) + { + updateArraysAndPairs(); + } + else + { + synchronized (topologyArrayGuard) + { + if (topology.isEmpty()) + { + // Resetting the topology to its original condition as it was brand new + receivedTopology = false; + topologyArray = null; + } + else + { + updateArraysAndPairs(); + + if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) + { + // Resetting the topology to its original condition as it was brand new + receivedTopology = false; + } + } + } + } + + } + + public void notifyNodeUp(long uniqueEventID, + final String nodeID, + final String backupGroupName, + final String scaleDownGroupName, + final Pair<TransportConfiguration, TransportConfiguration> connectorPair, + final boolean last) + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace")); + } + + TopologyMemberImpl member = new TopologyMemberImpl(nodeID, backupGroupName, scaleDownGroupName, connectorPair.getA(), connectorPair.getB()); + + topology.updateMember(uniqueEventID, nodeID, member); + + TopologyMember actMember = topology.getMember(nodeID); + + if (actMember != null && actMember.getLive() != null && actMember.getBackup() != null) + { + HashSet<ClientSessionFactory> clonedFactories = new HashSet<ClientSessionFactory>(); + synchronized (factories) + { + clonedFactories.addAll(factories); + } + + for (ClientSessionFactory factory : clonedFactories) + { + ((ClientSessionFactoryInternal) factory).setBackupConnector(actMember.getLive(), actMember.getBackup()); + } + } + + updateArraysAndPairs(); + + if (last) + { + synchronized (this) + { + receivedTopology = true; + // Notify if waiting on getting topology + notifyAll(); + } + } + } + + @Override + public String toString() + { + if (identity != null) + { + return "ServerLocatorImpl (identity=" + identity + + ") [initialConnectors=" + + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) + + ", discoveryGroupConfiguration=" + + discoveryGroupConfiguration + + "]"; + } + return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) + + ", discoveryGroupConfiguration=" + + discoveryGroupConfiguration + + "]"; + } + + @SuppressWarnings("unchecked") + private void updateArraysAndPairs() + { + synchronized (topologyArrayGuard) + { + Collection<TopologyMemberImpl> membersCopy = topology.getMembers(); + + Pair<TransportConfiguration, TransportConfiguration>[] topologyArrayLocal = + (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class, + membersCopy.size()); + + int count = 0; + for (TopologyMemberImpl pair : membersCopy) + { + topologyArrayLocal[count++] = pair.getConnector(); + } + + this.topologyArray = topologyArrayLocal; + } + } + + public synchronized void connectorsChanged(List<DiscoveryEntry> newConnectors) + { + if (receivedTopology) + { + return; + } + TransportConfiguration[] newInitialconnectors = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, + newConnectors.size()); + + int count = 0; + for (DiscoveryEntry entry : newConnectors) + { + newInitialconnectors[count++] = entry.getConnector(); + + if (ha && topology.getMember(entry.getNodeID()) == null) + { + TopologyMemberImpl member = new TopologyMemberImpl(entry.getNodeID(), null, null, entry.getConnector(), null); + // on this case we set it as zero as any update coming from server should be accepted + topology.updateMember(0, entry.getNodeID(), member); + } + } + + this.initialConnectors = newInitialconnectors.length == 0 ? null : newInitialconnectors; + + if (clusterConnection && !receivedTopology && this.getNumInitialConnectors() > 0) + { + // The node is alone in the cluster. We create a connection to the new node + // to trigger the node notification to form the cluster. + + Runnable connectRunnable = new Runnable() + { + public void run() + { + try + { + connect(); + } + catch (HornetQException e) + { + HornetQClientLogger.LOGGER.errorConnectingToNodes(e); + } + } + }; + if (startExecutor != null) + { + startExecutor.execute(connectRunnable); + } + else + { + connectRunnable.run(); + } + } + } + + public void factoryClosed(final ClientSessionFactory factory) + { + synchronized (factories) + { + factories.remove(factory); + + if (!clusterConnection && factories.isEmpty()) + { + // Go back to using the broadcast or static list + synchronized (topologyArrayGuard) + { + receivedTopology = false; + + topologyArray = null; + } + } + } + } + + public Topology getTopology() + { + return topology; + } + + @Override + public boolean isConnectable() + { + return getNumInitialConnectors() > 0 || getDiscoveryGroupConfiguration() != null; + } + + public ServerLocatorImpl addClusterTopologyListener(final ClusterTopologyListener listener) + { + topology.addClusterTopologyListener(listener); + return this; + } + + public void removeClusterTopologyListener(final ClusterTopologyListener listener) + { + topology.removeClusterTopologyListener(listener); + } + + private void addFactory(ClientSessionFactoryInternal factory) + { + if (factory == null) + { + return; + } + + if (isClosed()) + { + factory.close(); + return; + } + + TransportConfiguration backup = null; + + if (ha) + { + backup = topology.getBackupForConnector((Connector) factory.getConnector()); + } + + factory.setBackupConnector(factory.getConnectorConfiguration(), backup); + + synchronized (factories) + { + factories.add(factory); + } + } + + private void readObject(ObjectInputStream is) throws ClassNotFoundException, IOException + { + is.defaultReadObject(); + if (stateGuard == null) + { + stateGuard = new String(); + } + if (topologyArrayGuard == null) + { + topologyArrayGuard = new String(); + } + } + + private final class StaticConnector implements Serializable + { + private static final long serialVersionUID = 6772279632415242634L; + + private List<Connector> connectors; + + public ClientSessionFactory connect(boolean skipWarnings) throws HornetQException + { + assertOpen(); + + initialise(); + + ClientSessionFactory csf = null; + + createConnectors(); + + try + { + + int retryNumber = 0; + while (csf == null && !isClosed()) + { + retryNumber++; + for (Connector conn : connectors) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug(this + "::Submitting connect towards " + conn); + } + + csf = conn.tryConnect(); + + if (csf != null) + { + csf.getConnection().addFailureListener(new FailureListener() + { + // Case the node where the cluster connection was connected is gone, we need to restart the + // connection + @Override + public void connectionFailed(HornetQException exception, boolean failedOver) + { + if (clusterConnection && exception.getType() == HornetQExceptionType.DISCONNECTED) + { + try + { + ServerLocatorImpl.this.start(startExecutor); + } + catch (Exception e) + { + // There isn't much to be done if this happens here + HornetQClientLogger.LOGGER.errorStartingLocator(e); + } + } + } + + @Override + public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID) + { + connectionFailed(me, failedOver); + } + + @Override + public String toString() + { + return "FailureListener('restarts cluster connections')"; + } + }); + + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("Returning " + csf + + " after " + + retryNumber + + " retries on StaticConnector " + + ServerLocatorImpl.this); + } + + return csf; + } + } + + if (initialConnectAttempts >= 0 && retryNumber > initialConnectAttempts) + { + break; + } + + if (latch.await(retryInterval, TimeUnit.MILLISECONDS)) + return null; + } + + } + catch (RejectedExecutionException e) + { + if (isClosed() || skipWarnings) + return null; + HornetQClientLogger.LOGGER.debug("Rejected execution", e); + throw e; + } + catch (Exception e) + { + if (isClosed() || skipWarnings) + return null; + HornetQClientLogger.LOGGER.errorConnectingToNodes(e); + throw HornetQClientMessageBundle.BUNDLE.cannotConnectToStaticConnectors(e); + } + + if (isClosed() || skipWarnings) + { + return null; + } + + HornetQClientLogger.LOGGER.errorConnectingToNodes(traceException); + throw HornetQClientMessageBundle.BUNDLE.cannotConnectToStaticConnectors2(); + } + + private synchronized void createConnectors() + { + if (connectors != null) + { + for (Connector conn : connectors) + { + if (conn != null) + { + conn.disconnect(); + } + } + } + connectors = new ArrayList<Connector>(); + if (initialConnectors != null) + { + for (TransportConfiguration initialConnector : initialConnectors) + { + ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this, + initialConnector, + callTimeout, + callFailoverTimeout, + clientFailureCheckPeriod, + connectionTTL, + retryInterval, + retryIntervalMultiplier, + maxRetryInterval, + reconnectAttempts, + threadPool, + scheduledThreadPool, + incomingInterceptors, + outgoingInterceptors); + + factory.disableFinalizeCheck(); + + connectors.add(new Connector(initialConnector, factory)); + } + } + } + + public synchronized void disconnect() + { + if (connectors != null) + { + for (Connector connector : connectors) + { + connector.disconnect(); + } + } + } + + @Override + protected void finalize() throws Throwable + { + if (!isClosed() && finalizeCheck) + { + HornetQClientLogger.LOGGER.serverLocatorNotClosed(traceException, System.identityHashCode(this)); + + if (ServerLocatorImpl.finalizeCallback != null) + { + ServerLocatorImpl.finalizeCallback.run(); + } + + close(); + } + + super.finalize(); + } + + private final class Connector + { + private final TransportConfiguration initialConnector; + + private volatile ClientSessionFactoryInternal factory; + + public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory) + { + this.initialConnector = initialConnector; + this.factory = factory; + } + + public ClientSessionFactory tryConnect() throws HornetQException + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug(this + "::Trying to connect to " + factory); + } + try + { + ClientSessionFactoryInternal factoryToUse = factory; + if (factoryToUse != null) + { + addToConnecting(factoryToUse); + + try + { + factoryToUse.connect(1, false); + } + finally + { + removeFromConnecting(factoryToUse); + } + } + return factoryToUse; + } + catch (HornetQException e) + { + HornetQClientLogger.LOGGER.debug(this + "::Exception on establish connector initial connection", e); + return null; + } + } + + public void disconnect() + { + if (factory != null) + { + factory.causeExit(); + factory.cleanup(); + factory = null; + } + } + + @Override + public String toString() + { + return "Connector [initialConnector=" + initialConnector + "]"; + } + + } + } + + private void assertOpen() + { + synchronized (stateGuard) + { + if (state != null && state != STATE.INITIALIZED) + { + throw new IllegalStateException("Server locator is closed (maybe it was garbage collected)"); + } + } + } + + public boolean isClosed() + { + synchronized (stateGuard) + { + return state != STATE.INITIALIZED; + } + } + + private Object writeReplace() throws ObjectStreamException + { + ServerLocatorImpl clone = new ServerLocatorImpl(this); + return clone; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ServerLocatorInternal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ServerLocatorInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ServerLocatorInternal.java new file mode 100644 index 0000000..4ae2497 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ServerLocatorInternal.java @@ -0,0 +1,86 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.util.concurrent.Executor; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.spi.core.remoting.ClientProtocolManager; + +/** + * A ServerLocatorInternal + * + * @author Tim Fox + * + * + */ +public interface ServerLocatorInternal extends ServerLocator +{ + void start(Executor executor) throws Exception; + + void factoryClosed(final ClientSessionFactory factory); + + AfterConnectInternalListener getAfterConnectInternalListener(); + + ServerLocatorInternal setAfterConnectionInternalListener(AfterConnectInternalListener listener); + + /** Used to better identify Cluster Connection Locators on logs. To facilitate eventual debugging. + * + * This method used to be on tests interface, but I'm now making it part of the public interface since*/ + ServerLocatorInternal setIdentity(String identity); + + ServerLocatorInternal setNodeID(String nodeID); + + String getNodeID(); + + void cleanup(); + + // Reset this Locator back as if it never received any topology + void resetToInitialConnectors(); + + ClientSessionFactoryInternal connect() throws HornetQException; + + /** + * Like {@link #connect()} but it does not log warnings if it fails to connect. + * @throws HornetQException + */ + ClientSessionFactoryInternal connectNoWarnings() throws HornetQException; + + void notifyNodeUp(long uniqueEventID, String nodeID, String backupGroupName, String scaleDownGroupName, + Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last); + + /** + * + * @param uniqueEventID 0 means get the previous ID +1 + * @param nodeID + */ + void notifyNodeDown(long uniqueEventID, String nodeID); + + ServerLocatorInternal setClusterConnection(boolean clusterConnection); + + boolean isClusterConnection(); + + TransportConfiguration getClusterTransportConfiguration(); + + ServerLocatorInternal setClusterTransportConfiguration(TransportConfiguration tc); + + Topology getTopology(); + + ClientProtocolManager newProtocolManager(); + + boolean isConnectable(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/Topology.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/Topology.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/Topology.java new file mode 100644 index 0000000..deb1b36 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/Topology.java @@ -0,0 +1,546 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ClusterTopologyListener; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.spi.core.remoting.Connector; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author Clebert Suconic + * Created Aug 16, 2010 + */ +public final class Topology implements Serializable +{ + + private static final long serialVersionUID = -9037171688692471371L; + + private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>(); + + private transient Executor executor = null; + + /** + * Used to debug operations. + * <p> + * Someone may argue this is not needed. But it's impossible to debug anything related to + * topology without knowing what node or what object missed a Topology update. Hence I added some + * information to locate debugging here. + */ + private volatile Object owner; + + /** + * topology describes the other cluster nodes that this server knows about: + * + * keys are node IDs + * values are a pair of live/backup transport configurations + */ + private final Map<String, TopologyMemberImpl> topology = new ConcurrentHashMap<String, TopologyMemberImpl>(); + + private transient Map<String, Long> mapDelete; + + public Topology(final Object owner) + { + this.owner = owner; + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", + new Exception("trace")); + } + } + + public void setExecutor(final Executor executor) + { + this.executor = executor; + } + + /** + * It will remove all elements as if it haven't received anyone from the server. + */ + public void clear() + { + topology.clear(); + } + + public void addClusterTopologyListener(final ClusterTopologyListener listener) + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace(this + "::Adding topology listener " + listener, new Exception("Trace")); + } + synchronized (topologyListeners) + { + topologyListeners.add(listener); + } + this.sendTopology(listener); + } + + public void removeClusterTopologyListener(final ClusterTopologyListener listener) + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace(this + "::Removing topology listener " + listener, new Exception("Trace")); + } + synchronized (topologyListeners) + { + topologyListeners.remove(listener); + } + } + + /** This is called by the server when the node is activated from backup state. It will always succeed */ + public void updateAsLive(final String nodeId, final TopologyMemberImpl memberInput) + { + synchronized (this) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug(this + "::node " + nodeId + "=" + memberInput); + } + memberInput.setUniqueEventID(System.currentTimeMillis()); + topology.remove(nodeId); + topology.put(nodeId, memberInput); + sendMemberUp(nodeId, memberInput); + } + } + + /** + * After the node is started, it will resend the notifyLive a couple of times to avoid gossip between two servers + * @param nodeId + */ + public void resendNode(final String nodeId) + { + synchronized (this) + { + TopologyMemberImpl memberInput = topology.get(nodeId); + if (memberInput != null) + { + memberInput.setUniqueEventID(System.currentTimeMillis()); + sendMemberUp(nodeId, memberInput); + } + } + } + + /** This is called by the server when the node is activated from backup state. It will always succeed */ + public TopologyMemberImpl updateBackup(final TopologyMemberImpl memberInput) + { + final String nodeId = memberInput.getNodeId(); + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput); + } + + synchronized (this) + { + TopologyMemberImpl currentMember = getMember(nodeId); + if (currentMember == null) + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, + new Exception("trace")); + } + + currentMember = memberInput; + topology.put(nodeId, currentMember); + } + + TopologyMemberImpl newMember = + new TopologyMemberImpl(nodeId, currentMember.getBackupGroupName(), currentMember.getScaleDownGroupName(), currentMember.getLive(), + memberInput.getBackup()); + newMember.setUniqueEventID(System.currentTimeMillis()); + topology.remove(nodeId); + topology.put(nodeId, newMember); + sendMemberUp(nodeId, newMember); + + return newMember; + } + } + + /** + * @param uniqueEventID an unique identifier for when the change was made. We will use current + * time millis for starts, and a ++ of that number for shutdown. + * @param nodeId + * @param memberInput + * @return {@code true} if an update did take place. Note that backups are *always* updated. + */ + public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMemberImpl memberInput) + { + + Long deleteTme = getMapDelete().get(nodeId); + if (deleteTme != null && uniqueEventID != 0 && uniqueEventID < deleteTme) + { + HornetQClientLogger.LOGGER.debug("Update uniqueEvent=" + uniqueEventID + + ", nodeId=" + + nodeId + + ", memberInput=" + + memberInput + + " being rejected as there was a delete done after that"); + return false; + } + + synchronized (this) + { + TopologyMemberImpl currentMember = topology.get(nodeId); + + if (currentMember == null) + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput, + new Exception("trace")); + } + memberInput.setUniqueEventID(uniqueEventID); + topology.put(nodeId, memberInput); + sendMemberUp(nodeId, memberInput); + return true; + } + if (uniqueEventID > currentMember.getUniqueEventID()) + { + TopologyMemberImpl newMember = + new TopologyMemberImpl(nodeId, memberInput.getBackupGroupName(), memberInput.getScaleDownGroupName(), memberInput.getLive(), + memberInput.getBackup()); + + if (newMember.getLive() == null && currentMember.getLive() != null) + { + newMember.setLive(currentMember.getLive()); + } + + if (newMember.getBackup() == null && currentMember.getBackup() != null) + { + newMember.setBackup(currentMember.getBackup()); + } + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" + + currentMember + ", memberInput=" + memberInput + "newMember=" + + newMember, + new Exception("trace")); + } + + newMember.setUniqueEventID(uniqueEventID); + topology.remove(nodeId); + topology.put(nodeId, newMember); + sendMemberUp(nodeId, newMember); + + return true; + } + /* + * always add the backup, better to try to reconnect to something thats not there then to + * not know about it at all + */ + if (currentMember.getBackup() == null && memberInput.getBackup() != null) + { + currentMember.setBackup(memberInput.getBackup()); + } + return false; + } + } + + /** + * @param nodeId + * @param memberToSend + */ + private void sendMemberUp(final String nodeId, final TopologyMemberImpl memberToSend) + { + final ArrayList<ClusterTopologyListener> copy = copyListeners(); + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements"); + } + + if (copy.size() > 0) + { + execute(new Runnable() + { + public void run() + { + for (ClusterTopologyListener listener : copy) + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace(Topology.this + " informing " + + listener + + " about node up = " + + nodeId + + " connector = " + + memberToSend.getConnector()); + } + + try + { + listener.nodeUP(memberToSend, false); + } + catch (Throwable e) + { + HornetQClientLogger.LOGGER.errorSendingTopology(e); + } + } + } + }); + } + } + + /** + * @return + */ + private ArrayList<ClusterTopologyListener> copyListeners() + { + ArrayList<ClusterTopologyListener> listenersCopy; + synchronized (topologyListeners) + { + listenersCopy = new ArrayList<ClusterTopologyListener>(topologyListeners); + } + return listenersCopy; + } + + boolean removeMember(final long uniqueEventID, final String nodeId) + { + TopologyMemberImpl member; + + synchronized (this) + { + member = topology.get(nodeId); + if (member != null) + { + if (member.getUniqueEventID() > uniqueEventID) + { + HornetQClientLogger.LOGGER.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call"); + member = null; + } + else + { + getMapDelete().put(nodeId, uniqueEventID); + member = topology.remove(nodeId); + } + } + } + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("removeMember " + this + + " removing nodeID=" + + nodeId + + ", result=" + + member + + ", size = " + + topology.size(), new Exception("trace")); + } + + if (member != null) + { + final ArrayList<ClusterTopologyListener> copy = copyListeners(); + + execute(new Runnable() + { + public void run() + { + for (ClusterTopologyListener listener : copy) + { + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace(this + " informing " + listener + " about node down = " + nodeId); + } + try + { + listener.nodeDown(uniqueEventID, nodeId); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorSendingTopologyNodedown(e); + } + } + } + }); + + } + return member != null; + } + + private void execute(final Runnable runnable) + { + if (executor != null) + { + executor.execute(runnable); + } + else + { + runnable.run(); + } + } + + public synchronized void sendTopology(final ClusterTopologyListener listener) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug(this + " is sending topology to " + listener); + } + + execute(new Runnable() + { + public void run() + { + int count = 0; + + final Map<String, TopologyMemberImpl> copy; + + synchronized (Topology.this) + { + copy = new HashMap<String, TopologyMemberImpl>(topology); + } + + for (Map.Entry<String, TopologyMemberImpl> entry : copy.entrySet()) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug(Topology.this + " sending " + + entry.getKey() + + " / " + + entry.getValue().getConnector() + + " to " + + listener); + } + listener.nodeUP(entry.getValue(), ++count == copy.size()); + } + } + }); + } + + public synchronized TopologyMemberImpl getMember(final String nodeID) + { + return topology.get(nodeID); + } + + public synchronized TopologyMemberImpl getMember(final TransportConfiguration configuration) + { + for (TopologyMemberImpl member : topology.values()) + { + if (member.isMember(configuration)) + { + return member; + } + } + + return null; + } + + public synchronized boolean isEmpty() + { + return topology.isEmpty(); + } + + public Collection<TopologyMemberImpl> getMembers() + { + ArrayList<TopologyMemberImpl> members; + synchronized (this) + { + members = new ArrayList<TopologyMemberImpl>(topology.values()); + } + return members; + } + + synchronized int nodes() + { + int count = 0; + for (TopologyMemberImpl member : topology.values()) + { + if (member.getLive() != null) + { + count++; + } + if (member.getBackup() != null) + { + count++; + } + } + return count; + } + + public synchronized String describe() + { + return describe(""); + } + + private synchronized String describe(final String text) + { + StringBuilder desc = new StringBuilder(text + "topology on " + this + ":\n"); + for (Entry<String, TopologyMemberImpl> entry : new HashMap<String, TopologyMemberImpl>(topology).entrySet()) + { + desc.append("\t" + entry.getKey() + " => " + entry.getValue() + "\n"); + } + desc.append("\t" + "nodes=" + nodes() + "\t" + "members=" + members()); + if (topology.isEmpty()) + { + desc.append("\tEmpty"); + } + return desc.toString(); + } + + private int members() + { + return topology.size(); + } + + /** The owner exists mainly for debug purposes. + * When enabling logging and tracing, the Topology updates will include the owner, what will enable to identify + * what instances are receiving the updates, what will enable better debugging.*/ + public void setOwner(final Object owner) + { + this.owner = owner; + } + + public TransportConfiguration getBackupForConnector(final Connector connector) + { + for (TopologyMemberImpl member : topology.values()) + { + if (member.getLive() != null && connector.isEquivalent(member.getLive().getParams())) + { + return member.getBackup(); + } + } + return null; + } + + @Override + public String toString() + { + if (owner == null) + { + return "Topology@" + Integer.toHexString(System.identityHashCode(this)); + } + return "Topology@" + Integer.toHexString(System.identityHashCode(this)) + "[owner=" + owner + "]"; + } + + private synchronized Map<String, Long> getMapDelete() + { + if (mapDelete == null) + { + mapDelete = new ConcurrentHashMap<String, Long>(); + } + return mapDelete; + } + +}
