http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerImpl.java new file mode 100644 index 0000000..12dc322 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerImpl.java @@ -0,0 +1,607 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.message.BodyEncoder; +import org.apache.activemq6.core.message.impl.MessageInternal; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; +import org.apache.activemq6.spi.core.remoting.SessionContext; +import org.apache.activemq6.utils.DeflaterReader; +import org.apache.activemq6.utils.HornetQBufferInputStream; +import org.apache.activemq6.utils.TokenBucketLimiter; +import org.apache.activemq6.utils.UUIDGenerator; + +/** + * The client-side Producer. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class ClientProducerImpl implements ClientProducerInternal +{ + private final SimpleString address; + + private final ClientSessionInternal session; + + private final SessionContext sessionContext; + + private volatile boolean closed; + + // For rate throttling + + private final TokenBucketLimiter rateLimiter; + + private final boolean blockOnNonDurableSend; + + private final boolean blockOnDurableSend; + + private final SimpleString groupID; + + private final int minLargeMessageSize; + + private final ClientProducerCredits producerCredits; + + // Static --------------------------------------------------------------------------------------- + + // Constructors --------------------------------------------------------------------------------- + + public ClientProducerImpl(final ClientSessionInternal session, + final SimpleString address, + final TokenBucketLimiter rateLimiter, + final boolean blockOnNonDurableSend, + final boolean blockOnDurableSend, + final boolean autoGroup, + final SimpleString groupID, + final int minLargeMessageSize, + final SessionContext sessionContext) + { + this.sessionContext = sessionContext; + + this.session = session; + + this.address = address; + + this.rateLimiter = rateLimiter; + + this.blockOnNonDurableSend = blockOnNonDurableSend; + + this.blockOnDurableSend = blockOnDurableSend; + + if (autoGroup) + { + this.groupID = UUIDGenerator.getInstance().generateSimpleStringUUID(); + } + else + { + this.groupID = groupID; + } + + this.minLargeMessageSize = minLargeMessageSize; + + if (address != null) + { + producerCredits = session.getCredits(address, false); + } + else + { + producerCredits = null; + } + } + + // ClientProducer implementation ---------------------------------------------------------------- + + public SimpleString getAddress() + { + return address; + } + + public void send(final Message msg) throws HornetQException + { + checkClosed(); + + doSend(null, msg, null, false); + } + + public void send(final SimpleString address1, final Message msg) throws HornetQException + { + checkClosed(); + + doSend(address1, msg, null, false); + } + + public void send(final String address1, final Message message) throws HornetQException + { + send(SimpleString.toSimpleString(address1), message); + } + + @Override + public void send(SimpleString address1, Message message, SendAcknowledgementHandler handler) throws HornetQException + { + checkClosed(); + boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled(); + if (confirmationWindowEnabled) + { + doSend(address1, message, handler, true); + } + else + { + doSend(address1, message, null, true); + if (handler != null) + { + session.scheduleConfirmation(handler, message); + } + } + } + + @Override + public void send(Message message, SendAcknowledgementHandler handler) throws HornetQException + { + send(null, message, handler); + } + + public synchronized void close() throws HornetQException + { + if (closed) + { + return; + } + + doCleanup(); + } + + public void cleanUp() + { + if (closed) + { + return; + } + + doCleanup(); + } + + public boolean isClosed() + { + return closed; + } + + public boolean isBlockOnDurableSend() + { + return blockOnDurableSend; + } + + public boolean isBlockOnNonDurableSend() + { + return blockOnNonDurableSend; + } + + public int getMaxRate() + { + return rateLimiter == null ? -1 : rateLimiter.getRate(); + } + + // Public --------------------------------------------------------------------------------------- + + public ClientProducerCredits getProducerCredits() + { + return producerCredits; + } + + private void doCleanup() + { + if (address != null) + { + session.returnCredits(address); + } + + session.removeProducer(this); + + closed = true; + } + + private void doSend(final SimpleString address1, final Message msg, final SendAcknowledgementHandler handler, + final boolean forceAsync) throws HornetQException + { + session.startCall(); + + try + { + MessageInternal msgI = (MessageInternal) msg; + + ClientProducerCredits theCredits; + + boolean isLarge; + // a note about the second check on the writerIndexSize, + // If it's a server's message, it means this is being done through the bridge or some special consumer on the + // server's on which case we can't' convert the message into large at the servers + if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || + msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) + { + isLarge = true; + } + else + { + isLarge = false; + } + + if (address1 != null) + { + if (!isLarge) + { + session.setAddress(msg, address1); + } + else + { + msg.setAddress(address1); + } + + // Anonymous + theCredits = session.getCredits(address1, true); + } + else + { + if (!isLarge) + { + session.setAddress(msg, this.address); + } + else + { + msg.setAddress(this.address); + } + + theCredits = producerCredits; + } + + if (rateLimiter != null) + { + // Rate flow control + + rateLimiter.limit(); + } + + if (groupID != null) + { + msgI.putStringProperty(Message.HDR_GROUP_ID, groupID); + } + + final boolean sendBlockingConfig = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend; + final boolean forceAsyncOverride = handler != null; + final boolean sendBlocking = sendBlockingConfig && !forceAsyncOverride; + + session.workDone(); + + if (isLarge) + { + largeMessageSend(sendBlocking, msgI, theCredits, handler); + } + else + { + sendRegularMessage(msgI, sendBlocking, theCredits, handler); + } + } + finally + { + session.endCall(); + } + } + + private void sendRegularMessage(final MessageInternal msgI, final boolean sendBlocking, final ClientProducerCredits theCredits, final SendAcknowledgementHandler handler) throws HornetQException + { + try + { + // This will block if credits are not available + + // Note, that for a large message, the encode size only includes the properties + headers + // Not the continuations, but this is ok since we are only interested in limiting the amount of + // data in *memory* and continuations go straight to the disk + + int creditSize = sessionContext.getCreditsOnSendingFull(msgI); + + theCredits.acquireCredits(creditSize); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + + sessionContext.sendFullMessage(msgI, sendBlocking, handler, address); + } + + private void checkClosed() throws HornetQException + { + if (closed) + { + throw HornetQClientMessageBundle.BUNDLE.producerClosed(); + } + } + + // Methods to send Large Messages---------------------------------------------------------------- + + /** + * @param msgI + * @param handler + * @throws HornetQException + */ + private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, + final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException + { + int headerSize = msgI.getHeadersAndPropertiesEncodeSize(); + + if (msgI.getHeadersAndPropertiesEncodeSize() >= minLargeMessageSize) + { + throw HornetQClientMessageBundle.BUNDLE.headerSizeTooBig(headerSize); + } + + // msg.getBody() could be Null on LargeServerMessage + if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null) + { + msgI.getWholeBuffer().readerIndex(0); + } + + InputStream input; + + if (msgI.isServerMessage()) + { + largeMessageSendServer(sendBlocking, msgI, credits, handler); + } + else if ((input = msgI.getBodyInputStream()) != null) + { + largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler); + } + else + { + largeMessageSendBuffered(sendBlocking, msgI, credits, handler); + } + } + + private void sendInitialLargeMessageHeader(MessageInternal msgI, ClientProducerCredits credits) throws HornetQException + { + int creditsUsed = sessionContext.sendInitialChunkOnLargeMessage(msgI); + + // On the case of large messages we tried to send credits before but we would starve otherwise + // we may find a way to improve the logic and always acquire the credits before + // but that's the way it's been tested and been working ATM + try + { + credits.acquireCredits(creditsUsed); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + + /** + * Used to send serverMessages through the bridges. No need to validate compression here since + * the message is only compressed at the client + * + * @param sendBlocking + * @param msgI + * @param handler + * @throws HornetQException + */ + private void largeMessageSendServer(final boolean sendBlocking, final MessageInternal msgI, + final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException + { + sendInitialLargeMessageHeader(msgI, credits); + + BodyEncoder context = msgI.getBodyEncoder(); + + final long bodySize = context.getLargeBodySize(); + + context.open(); + try + { + + for (int pos = 0; pos < bodySize; ) + { + final boolean lastChunk; + + final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize); + + final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength); + + context.encode(bodyBuffer, chunkLength); + + pos += chunkLength; + + lastChunk = pos >= bodySize; + SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; + + int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); + + try + { + credits.acquireCredits(creditsUsed); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + } + finally + { + context.close(); + } + } + + /** + * @param sendBlocking + * @param msgI + * @param handler + * @throws HornetQException + */ + private void + largeMessageSendBuffered(final boolean sendBlocking, final MessageInternal msgI, + final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws HornetQException + { + msgI.getBodyBuffer().readerIndex(0); + largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits, + handler); + } + + /** + * @param sendBlocking + * @param msgI + * @param inputStreamParameter + * @param credits + * @throws HornetQException + */ + private void largeMessageSendStreamed(final boolean sendBlocking, final MessageInternal msgI, + final InputStream inputStreamParameter, final ClientProducerCredits credits, + SendAcknowledgementHandler handler) throws HornetQException + { + boolean lastPacket = false; + + InputStream input = inputStreamParameter; + + // We won't know the real size of the message since we are compressing while reading the streaming. + // This counter will be passed to the deflater to be updated for every byte read + AtomicLong messageSize = new AtomicLong(); + + DeflaterReader deflaterReader = null; + + if (session.isCompressLargeMessages()) + { + msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true); + deflaterReader = new DeflaterReader(inputStreamParameter, messageSize); + input = deflaterReader; + } + + long totalSize = 0; + + boolean headerSent = false; + + while (!lastPacket) + { + byte[] buff = new byte[minLargeMessageSize]; + + int pos = 0; + + do + { + int numberOfBytesRead; + + int wanted = minLargeMessageSize - pos; + + try + { + numberOfBytesRead = input.read(buff, pos, wanted); + } + catch (IOException e) + { + throw HornetQClientMessageBundle.BUNDLE.errorReadingBody(e); + } + + if (numberOfBytesRead == -1) + { + lastPacket = true; + + break; + } + + pos += numberOfBytesRead; + } + while (pos < minLargeMessageSize); + + totalSize += pos; + + final SessionSendContinuationMessage chunk; + + if (lastPacket) + { + if (!session.isCompressLargeMessages()) + { + messageSize.set(totalSize); + } + + // This is replacing the last packet by a smaller packet + byte[] buff2 = new byte[pos]; + + System.arraycopy(buff, 0, buff2, 0, pos); + + buff = buff2; + + // This is the case where the message is being converted as a regular message + if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize) + { + msgI.getBodyBuffer().resetReaderIndex(); + msgI.getBodyBuffer().resetWriterIndex(); + msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize()); + + msgI.getBodyBuffer().writeBytes(buff, 0, pos); + sendRegularMessage(msgI, sendBlocking, credits, handler); + return; + } + else + { + if (!headerSent) + { + headerSent = true; + sendInitialLargeMessageHeader(msgI, credits); + } + int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, handler); + try + { + credits.acquireCredits(creditsSent); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + } + else + { + if (!headerSent) + { + headerSent = true; + sendInitialLargeMessageHeader(msgI, credits); + } + + + int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, handler); + try + { + credits.acquireCredits(creditsSent); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + } + + try + { + input.close(); + } + catch (IOException e) + { + throw HornetQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerInternal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerInternal.java new file mode 100644 index 0000000..056c20c --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientProducerInternal.java @@ -0,0 +1,29 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.client.ClientProducer; + +/** + * + * A ClientProducerInternal + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public interface ClientProducerInternal extends ClientProducer +{ + void cleanUp(); + + ClientProducerCredits getProducerCredits(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryImpl.java new file mode 100644 index 0000000..eebc044 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryImpl.java @@ -0,0 +1,1609 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.client.impl; + +import java.lang.ref.WeakReference; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.HornetQNotConnectedException; +import org.apache.activemq6.api.core.Interceptor; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.api.core.client.FailoverEventListener; +import org.apache.activemq6.api.core.client.FailoverEventType; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.core.client.SessionFailureListener; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq6.core.remoting.FailureListener; +import org.apache.activemq6.core.server.HornetQComponent; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.spi.core.remoting.BufferHandler; +import org.apache.activemq6.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq6.spi.core.remoting.Connector; +import org.apache.activemq6.spi.core.remoting.ConnectorFactory; +import org.apache.activemq6.spi.core.remoting.TopologyResponseHandler; +import org.apache.activemq6.spi.core.remoting.SessionContext; +import org.apache.activemq6.utils.ClassloadingUtil; +import org.apache.activemq6.utils.ConcurrentHashSet; +import org.apache.activemq6.utils.ConfigurationHelper; +import org.apache.activemq6.utils.ConfirmationWindowWarning; +import org.apache.activemq6.utils.ExecutorFactory; +import org.apache.activemq6.utils.OrderedExecutorFactory; +import org.apache.activemq6.utils.UUIDGenerator; + +/** + * @author Tim Fox + * @author Clebert Suconic + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + */ + +public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ConnectionLifeCycleListener +{ + // Constants + // ------------------------------------------------------------------------------------ + + private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled(); + + private static final boolean isDebug = HornetQClientLogger.LOGGER.isDebugEnabled(); + + // Attributes + // ----------------------------------------------------------------------------------- + + private final ServerLocatorInternal serverLocator; + + private final ClientProtocolManager clientProtocolManager; + + private TransportConfiguration connectorConfig; + + private TransportConfiguration backupConfig; + + private ConnectorFactory connectorFactory; + + private transient boolean finalizeCheck = true; + + private final long callTimeout; + + private final long callFailoverTimeout; + + private final long clientFailureCheckPeriod; + + private final long connectionTTL; + + private final Set<ClientSessionInternal> sessions = new HashSet<ClientSessionInternal>(); + + private final Object createSessionLock = new Object(); + + private final Lock newFailoverLock = new ReentrantLock(); + + + private final Object connectionLock = new Object(); + + private final ExecutorFactory orderedExecutorFactory; + + private final Executor threadPool; + + private final ScheduledExecutorService scheduledThreadPool; + + private final Executor closeExecutor; + + private RemotingConnection connection; + + private final long retryInterval; + + private final double retryIntervalMultiplier; // For exponential backoff + + private final long maxRetryInterval; + + private int reconnectAttempts; + + private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<SessionFailureListener>(); + + private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<FailoverEventListener>(); + + private Connector connector; + + private Future<?> pingerFuture; + private PingRunnable pingRunnable; + + + private final List<Interceptor> incomingInterceptors; + + private final List<Interceptor> outgoingInterceptors; + + private volatile boolean stopPingingAfterOne; + + private volatile boolean closed; + + public final Exception createTrace; + + public static final Set<CloseRunnable> CLOSE_RUNNABLES = Collections.synchronizedSet(new HashSet<CloseRunnable>()); + + private final ConfirmationWindowWarning confirmationWindowWarning; + + private String liveNodeID; + + public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, + final TransportConfiguration connectorConfig, + final long callTimeout, + final long callFailoverTimeout, + final long clientFailureCheckPeriod, + final long connectionTTL, + final long retryInterval, + final double retryIntervalMultiplier, + final long maxRetryInterval, + final int reconnectAttempts, + final Executor threadPool, + final ScheduledExecutorService scheduledThreadPool, + final List<Interceptor> incomingInterceptors, + final List<Interceptor> outgoingInterceptors) + { + createTrace = new Exception(); + + this.serverLocator = serverLocator; + + this.clientProtocolManager = serverLocator.newProtocolManager(); + + this.clientProtocolManager.setSessionFactory(this); + + this.connectorConfig = connectorConfig; + + connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName()); + + checkTransportKeys(connectorFactory, connectorConfig); + + this.callTimeout = callTimeout; + + this.callFailoverTimeout = callFailoverTimeout; + + // HORNETQ-1314 - if this in an in-vm connection then disable connection monitoring + if (connectorFactory.isReliable() && + clientFailureCheckPeriod == HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD && + connectionTTL == HornetQClient.DEFAULT_CONNECTION_TTL) + { + this.clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD_INVM; + this.connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL_INVM; + } + else + { + this.clientFailureCheckPeriod = clientFailureCheckPeriod; + + this.connectionTTL = connectionTTL; + } + + this.retryInterval = retryInterval; + + this.retryIntervalMultiplier = retryIntervalMultiplier; + + this.maxRetryInterval = maxRetryInterval; + + this.reconnectAttempts = reconnectAttempts; + + this.scheduledThreadPool = scheduledThreadPool; + + this.threadPool = threadPool; + + orderedExecutorFactory = new OrderedExecutorFactory(threadPool); + + closeExecutor = orderedExecutorFactory.getExecutor(); + + this.incomingInterceptors = incomingInterceptors; + + this.outgoingInterceptors = outgoingInterceptors; + + confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); + + } + + public void disableFinalizeCheck() + { + finalizeCheck = false; + } + + public Lock lockFailover() + { + newFailoverLock.lock(); + return newFailoverLock; + } + + public void connect(final int initialConnectAttempts, final boolean failoverOnInitialConnection) throws HornetQException + { + // Get the connection + getConnectionWithRetry(initialConnectAttempts); + + if (connection == null) + { + StringBuilder msg = + new StringBuilder("Unable to connect to server using configuration ").append(connectorConfig); + if (backupConfig != null) + { + msg.append(" and backup configuration ").append(backupConfig); + } + throw new HornetQNotConnectedException(msg.toString()); + } + + } + + public TransportConfiguration getConnectorConfiguration() + { + return connectorConfig; + } + + public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp) + { + Connector localConnector = connector; + + // if the connector has never been used (i.e. the getConnection hasn't been called yet), we will need + // to create a connector just to validate if the parameters are ok. + // so this will create the instance to be used on the isEquivalent check + if (localConnector == null) + { + localConnector = connectorFactory.createConnector(connectorConfig.getParams(), + new DelegatingBufferHandler(), + this, + closeExecutor, + threadPool, + scheduledThreadPool, + clientProtocolManager); + } + + if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) + { + if (ClientSessionFactoryImpl.isDebug) + { + HornetQClientLogger.LOGGER.debug("Setting up backup config = " + backUp + " for live = " + live); + } + backupConfig = backUp; + } + else + { + if (ClientSessionFactoryImpl.isDebug) + { + HornetQClientLogger.LOGGER.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live + + " / " + + backUp + + " but it didn't belong to " + + connectorConfig); + } + } + } + + public Object getBackupConnector() + { + return backupConfig; + } + + public ClientSession createSession(final String username, + final String password, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final int ackBatchSize) throws HornetQException + { + return createSessionInternal(username, + password, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + ackBatchSize); + } + + public ClientSession createSession(final boolean autoCommitSends, + final boolean autoCommitAcks, + final int ackBatchSize) throws HornetQException + { + return createSessionInternal(null, + null, + false, + autoCommitSends, + autoCommitAcks, + serverLocator.isPreAcknowledge(), + ackBatchSize); + } + + public ClientSession createXASession() throws HornetQException + { + return createSessionInternal(null, + null, + true, + false, + false, + serverLocator.isPreAcknowledge(), + serverLocator.getAckBatchSize()); + } + + public ClientSession createTransactedSession() throws HornetQException + { + return createSessionInternal(null, + null, + false, + false, + false, + serverLocator.isPreAcknowledge(), + serverLocator.getAckBatchSize()); + } + + public ClientSession createSession() throws HornetQException + { + return createSessionInternal(null, + null, + false, + true, + true, + serverLocator.isPreAcknowledge(), + serverLocator.getAckBatchSize()); + } + + public ClientSession createSession(final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException + { + return createSessionInternal(null, + null, + false, + autoCommitSends, + autoCommitAcks, + serverLocator.isPreAcknowledge(), + serverLocator.getAckBatchSize()); + } + + public ClientSession createSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks) throws HornetQException + { + return createSessionInternal(null, + null, + xa, + autoCommitSends, + autoCommitAcks, + serverLocator.isPreAcknowledge(), + serverLocator.getAckBatchSize()); + } + + public ClientSession createSession(final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge) throws HornetQException + { + return createSessionInternal(null, + null, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + serverLocator.getAckBatchSize()); + } + + // ConnectionLifeCycleListener implementation -------------------------------------------------- + + public void connectionCreated(final HornetQComponent component, final Connection connection, final String protocol) + { + } + + public void connectionDestroyed(final Object connectionID) + { + // The exception has to be created in the same thread where it's being called + // as to avoid a different stack trace cause + final HornetQException ex = HornetQClientMessageBundle.BUNDLE.channelDisconnected(); + + // It has to use the same executor as the disconnect message is being sent through + + closeExecutor.execute(new Runnable() + { + public void run() + { + handleConnectionFailure(connectionID, ex); + } + }); + + } + + public void connectionException(final Object connectionID, final HornetQException me) + { + handleConnectionFailure(connectionID, me); + } + + // Must be synchronized to prevent it happening concurrently with failover which can lead to + // inconsistencies + public void removeSession(final ClientSessionInternal session, final boolean failingOver) + { + synchronized (sessions) + { + sessions.remove(session); + } + } + + public void connectionReadyForWrites(final Object connectionID, final boolean ready) + { + } + + public synchronized int numConnections() + { + return connection != null ? 1 : 0; + } + + public int numSessions() + { + return sessions.size(); + } + + public void addFailureListener(final SessionFailureListener listener) + { + listeners.add(listener); + } + + public boolean removeFailureListener(final SessionFailureListener listener) + { + return listeners.remove(listener); + } + + public ClientSessionFactoryImpl addFailoverListener(FailoverEventListener listener) + { + failoverListeners.add(listener); + return this; + } + + public boolean removeFailoverListener(FailoverEventListener listener) + { + return failoverListeners.remove(listener); + } + + public void causeExit() + { + clientProtocolManager.stop(); + } + + private void interruptConnectAndCloseAllSessions(boolean close) + { + clientProtocolManager.stop(); + + synchronized (createSessionLock) + { + closeCleanSessions(close); + closed = true; + } + } + + /** + * @param close + */ + private void closeCleanSessions(boolean close) + { + HashSet<ClientSessionInternal> sessionsToClose; + synchronized (sessions) + { + sessionsToClose = new HashSet<ClientSessionInternal>(sessions); + } + // work on a copied set. the session will be removed from sessions when session.close() is + // called + for (ClientSessionInternal session : sessionsToClose) + { + try + { + if (close) + session.close(); + else + session.cleanUp(false); + } + catch (Exception e1) + { + HornetQClientLogger.LOGGER.unableToCloseSession(e1); + } + } + checkCloseConnection(); + } + + public void close() + { + if (closed) + { + return; + } + interruptConnectAndCloseAllSessions(true); + + serverLocator.factoryClosed(this); + } + + public void cleanup() + { + if (closed) + { + return; + } + + interruptConnectAndCloseAllSessions(false); + } + + public boolean isClosed() + { + return closed || serverLocator.isClosed(); + } + + @Override + public ServerLocator getServerLocator() + { + return serverLocator; + } + + public void stopPingingAfterOne() + { + stopPingingAfterOne = true; + } + + private void handleConnectionFailure(final Object connectionID, final HornetQException me) + { + handleConnectionFailure(connectionID, me, null); + } + + private void handleConnectionFailure(final Object connectionID, final HornetQException me, String scaleDownTargetNodeID) + { + try + { + failoverOrReconnect(connectionID, me, scaleDownTargetNodeID); + } + catch (HornetQInterruptedException e1) + { + // this is just a debug, since an interrupt is an expected event (in case of a shutdown) + HornetQClientLogger.LOGGER.debug(e1.getMessage(), e1); + } + } + + /** + * TODO: Maybe this belongs to HornetQClientProtocolManager + * @param connectionID + * @param me + */ + private void failoverOrReconnect(final Object connectionID, final HornetQException me, String scaleDownTargetNodeID) + { + Set<ClientSessionInternal> sessionsToClose = null; + if (!clientProtocolManager.isAlive()) + return; + Lock localFailoverLock = lockFailover(); + try + { + if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive()) + { + // We already failed over/reconnected - probably the first failure came in, all the connections were failed + // over then a async connection exception or disconnect + // came in for one of the already exitLoop connections, so we return true - we don't want to call the + // listeners again + + return; + } + + if (ClientSessionFactoryImpl.isTrace) + { + HornetQClientLogger.LOGGER.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts); + } + + callFailoverListeners(FailoverEventType.FAILURE_DETECTED); + // We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages + callSessionFailureListeners(me, false, false, scaleDownTargetNodeID); + + // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure + // There are either no threads executing in createSession, or one is blocking on a createSession + // result. + + // Then interrupt the channel 1 that is blocking (could just interrupt them all) + + // Then release all channel 1 locks - this allows the createSession to exit the monitor + + // Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and + // returned all its connections to the connection manager (the code to return connections to connection manager + // must be inside the lock + + // Then perform failover + + // Then release failoverLock + + // The other side of the bargain - during createSession: + // The calling thread must get the failoverLock and get its' connections when this is + // locked. + // While this is still locked it must then get the channel1 lock + // It can then release the failoverLock + // It should catch HornetQException.INTERRUPTED in the call to channel.sendBlocking + // It should then return its connections, with channel 1 lock still held + // It can then release the channel 1 lock, and retry (which will cause locking on + // failoverLock + // until failover is complete + + if (reconnectAttempts != 0) + { + + + if (clientProtocolManager.cleanupBeforeFailover(me)) + { + + + // Now we absolutely know that no threads are executing in or blocked in + // createSession, + // and no + // more will execute it until failover is complete + + // So.. do failover / reconnection + + RemotingConnection oldConnection = connection; + + connection = null; + + Connector localConnector = connector; + if (localConnector != null) + { + try + { + localConnector.close(); + } + catch (Exception ignore) + { + // no-op + } + } + + cancelScheduledTasks(); + + connector = null; + + reconnectSessions(oldConnection, reconnectAttempts, me); + + if (oldConnection != null) + { + oldConnection.destroy(); + } + + if (connection != null) + { + callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED); + } + } + } + else + { + RemotingConnection connectionToDestory = connection; + if (connectionToDestory != null) + { + connectionToDestory.destroy(); + } + connection = null; + } + + if (connection == null) + { + synchronized (sessions) + { + sessionsToClose = new HashSet<ClientSessionInternal>(sessions); + } + callFailoverListeners(FailoverEventType.FAILOVER_FAILED); + callSessionFailureListeners(me, true, false); + } + } + finally + { + localFailoverLock.unlock(); + } + + // This needs to be outside the failover lock to prevent deadlock + if (connection != null) + { + callSessionFailureListeners(me, true, true); + } + if (sessionsToClose != null) + { + // If connection is null it means we didn't succeed in failing over or reconnecting + // so we close all the sessions, so they will throw exceptions when attempted to be used + + for (ClientSessionInternal session : sessionsToClose) + { + try + { + session.cleanUp(true); + } + catch (Exception cause) + { + HornetQClientLogger.LOGGER.failedToCleanupSession(cause); + } + } + } + } + + private ClientSession createSessionInternal(final String username, + final String password, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final int ackBatchSize) throws HornetQException + { + String name = UUIDGenerator.getInstance().generateStringUUID(); + + + SessionContext context = createSessionChannel(name, + username, + password, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge); + + ClientSessionInternal session = new ClientSessionImpl(this, + name, + username, + password, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + serverLocator.isBlockOnAcknowledge(), + serverLocator.isAutoGroup(), + ackBatchSize, + serverLocator.getConsumerWindowSize(), + serverLocator.getConsumerMaxRate(), + serverLocator.getConfirmationWindowSize(), + serverLocator.getProducerWindowSize(), + serverLocator.getProducerMaxRate(), + serverLocator.isBlockOnNonDurableSend(), + serverLocator.isBlockOnDurableSend(), + serverLocator.isCacheLargeMessagesClient(), + serverLocator.getMinLargeMessageSize(), + serverLocator.isCompressLargeMessage(), + serverLocator.getInitialMessagePacketSize(), + serverLocator.getGroupID(), + context, + orderedExecutorFactory.getExecutor(), + orderedExecutorFactory.getExecutor()); + + synchronized (sessions) + { + if (closed || !clientProtocolManager.isAlive()) + { + session.close(); + return null; + } + sessions.add(session); + } + + return new DelegatingSession(session); + + } + + + private void callSessionFailureListeners(final HornetQException me, final boolean afterReconnect, + final boolean failedOver) + { + callSessionFailureListeners(me, afterReconnect, failedOver, null); + } + + private void callSessionFailureListeners(final HornetQException me, final boolean afterReconnect, + final boolean failedOver, final String scaleDownTargetNodeID) + { + final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners); + + for (final SessionFailureListener listener : listenersClone) + { + try + { + if (afterReconnect) + { + listener.connectionFailed(me, failedOver, scaleDownTargetNodeID); + } + else + { + listener.beforeReconnect(me); + } + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + HornetQClientLogger.LOGGER.failedToExecuteListener(t); + } + } + } + + private void callFailoverListeners(FailoverEventType type) + { + final List<FailoverEventListener> listenersClone = new ArrayList<>(failoverListeners); + + for (final FailoverEventListener listener : listenersClone) + { + try + { + listener.failoverEvent(type); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + HornetQClientLogger.LOGGER.failedToExecuteListener(t); + } + } + } + + /* + * Re-attach sessions all pre-existing sessions to the new remoting connection + */ + private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts, final HornetQException cause) + { + HashSet<ClientSessionInternal> sessionsToFailover; + synchronized (sessions) + { + sessionsToFailover = new HashSet<ClientSessionInternal>(sessions); + } + + for (ClientSessionInternal session : sessionsToFailover) + { + session.preHandleFailover(connection); + } + + getConnectionWithRetry(reconnectAttempts); + + if (connection == null) + { + if (!clientProtocolManager.isAlive()) + HornetQClientLogger.LOGGER.failedToConnectToServer(); + + return; + } + + + + + List<FailureListener> oldListeners = oldConnection.getFailureListeners(); + + List<FailureListener> newListeners = new ArrayList<>(connection.getFailureListeners()); + + for (FailureListener listener : oldListeners) + { + // Add all apart from the old DelegatingFailureListener + if (listener instanceof DelegatingFailureListener == false) + { + newListeners.add(listener); + } + } + + connection.setFailureListeners(newListeners); + + // This used to be done inside failover + // it needs to be done on the protocol + ((CoreRemotingConnection)connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence()); + + for (ClientSessionInternal session : sessionsToFailover) + { + session.handleFailover(connection, cause); + } + } + + private void getConnectionWithRetry(final int reconnectAttempts) + { + if (!clientProtocolManager.isAlive()) + return; + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("getConnectionWithRetry::" + reconnectAttempts + + " with retryInterval = " + + retryInterval + + " multiplier = " + + retryIntervalMultiplier, new Exception("trace")); + } + + long interval = retryInterval; + + int count = 0; + + while (clientProtocolManager.isAlive()) + { + if (ClientSessionFactoryImpl.isDebug) + { + HornetQClientLogger.LOGGER.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts); + } + + if (getConnection() != null) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("Reconnection successful"); + } + return; + } + else + { + // Failed to get connection + + if (reconnectAttempts != 0) + { + count++; + + if (reconnectAttempts != -1 && count == reconnectAttempts) + { + if (reconnectAttempts != 1) + { + HornetQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts); + } + + return; + } + + if (ClientSessionFactoryImpl.isTrace) + { + HornetQClientLogger.LOGGER.waitingForRetry(interval, retryInterval, retryIntervalMultiplier); + } + + try + { + if (clientProtocolManager.waitOnLatch(interval)) + { + return; + } + } + catch (InterruptedException ignore) + { + throw new HornetQInterruptedException(createTrace); + } + + // Exponential back-off + long newInterval = (long) (interval * retryIntervalMultiplier); + + if (newInterval > maxRetryInterval) + { + newInterval = maxRetryInterval; + } + + interval = newInterval; + } + else + { + HornetQClientLogger.LOGGER.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory"); + return; + } + } + } + } + + private void cancelScheduledTasks() + { + Future<?> pingerFutureLocal = pingerFuture; + if (pingerFutureLocal != null) + { + pingerFutureLocal.cancel(false); + } + PingRunnable pingRunnableLocal = pingRunnable; + if (pingRunnableLocal != null) + { + pingRunnableLocal.cancel(); + } + pingerFuture = null; + pingRunnable = null; + } + + private void checkCloseConnection() + { + if (connection != null && sessions.size() == 0) + { + cancelScheduledTasks(); + + try + { + connection.destroy(); + } + catch (Throwable ignore) + { + } + + connection = null; + + try + { + if (connector != null) + { + connector.close(); + } + } + catch (Throwable ignore) + { + } + + connector = null; + } + } + + public RemotingConnection getConnection() + { + if (closed) + throw new IllegalStateException("ClientSessionFactory is closed!"); + if (!clientProtocolManager.isAlive()) + return null; + synchronized (connectionLock) + { + if (connection != null) + { + // a connection already exists, so returning the same one + return connection; + } + else + { + connection = establishNewConnection(); + + //we check if we can actually connect. + // we do it here as to receive the reply connection has to be not null + if (connection != null && liveNodeID != null) + { + try + { + if (!clientProtocolManager.checkForFailover(liveNodeID)) + { + connection.destroy(); + connection = null; + } + } + catch (HornetQException e) + { + if (connection != null) + { + connection.destroy(); + connection = null; + } + } + } + + if (connection != null && serverLocator.getAfterConnectInternalListener() != null) + { + serverLocator.getAfterConnectInternalListener().onConnection(this); + } + + if (serverLocator.getTopology() != null) + { + if (connection != null) + { + if (ClientSessionFactoryImpl.isTrace) + { + HornetQClientLogger.LOGGER.trace(this + "::Subscribing Topology"); + } + clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection()); + } + } + else + { + HornetQClientLogger.LOGGER.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology")); + } + + return connection; + } + } + } + + + protected void schedulePing() + { + if (pingerFuture == null) + { + pingRunnable = new ClientSessionFactoryImpl.PingRunnable(); + + if (clientFailureCheckPeriod != -1) + { + pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ClientSessionFactoryImpl.ActualScheduledPinger(pingRunnable), + 0, + clientFailureCheckPeriod, + TimeUnit.MILLISECONDS); + } + + // To make sure the first ping will be sent + pingRunnable.send(); + } + // send a ping every time we create a new remoting connection + // to set up its TTL on the server side + else + { + pingRunnable.run(); + } + } + + + @Override + protected void finalize() throws Throwable + { + if (!closed && finalizeCheck) + { + HornetQClientLogger.LOGGER.factoryLeftOpen(createTrace, System.identityHashCode(this)); + + close(); + } + + super.finalize(); + } + + protected ConnectorFactory instantiateConnectorFactory(final String connectorFactoryClassName) + { + + // Will set the instance here to avoid races where cachedFactory is set to null + ConnectorFactory cachedFactory = connectorFactory; + + // First if cachedFactory had been used already, we take it from the cache. + if (cachedFactory != null && cachedFactory.getClass().getName().equals(connectorFactoryClassName)) + { + return cachedFactory; + } + // else... we will try to instantiate a new one + + return AccessController.doPrivileged(new PrivilegedAction<ConnectorFactory>() + { + public ConnectorFactory run() + { + return (ConnectorFactory) ClassloadingUtil.newInstanceFromClassLoader(connectorFactoryClassName); + } + }); + } + + + public class CloseRunnable implements Runnable + { + private final RemotingConnection conn; + private final String scaleDownTargetNodeID; + + public CloseRunnable(RemotingConnection conn, String scaleDownTargetNodeID) + { + this.conn = conn; + this.scaleDownTargetNodeID = scaleDownTargetNodeID; + } + + // Must be executed on new thread since cannot block the Netty thread for a long time and fail + // can cause reconnect loop + public void run() + { + try + { + CLOSE_RUNNABLES.add(this); + if (scaleDownTargetNodeID == null) + { + conn.fail(HornetQClientMessageBundle.BUNDLE.disconnected()); + } + else + { + conn.fail(HornetQClientMessageBundle.BUNDLE.disconnected(), scaleDownTargetNodeID); + } + } + finally + { + CLOSE_RUNNABLES.remove(this); + } + + } + + public ClientSessionFactoryImpl stop() + { + causeExit(); + CLOSE_RUNNABLES.remove(this); + return ClientSessionFactoryImpl.this; + } + + } + + + public void setReconnectAttempts(final int attempts) + { + reconnectAttempts = attempts; + } + + public Object getConnector() + { + return connector; + } + + @Override + public ConfirmationWindowWarning getConfirmationWindowWarning() + { + return confirmationWindowWarning; + } + + protected Connection openTransportConnection(final Connector connector) + { + connector.start(); + + Connection transportConnection = connector.createConnection(); + + if (transportConnection == null) + { + if (ClientSessionFactoryImpl.isDebug) + { + HornetQClientLogger.LOGGER.debug("Connector towards " + connector + " failed"); + } + + try + { + connector.close(); + } + catch (Throwable t) + { + } + } + + return transportConnection; + } + + protected Connector createConnector(ConnectorFactory connectorFactory, TransportConfiguration configuration) + { + return connectorFactory.createConnector(configuration.getParams(), + new DelegatingBufferHandler(), + this, + closeExecutor, + threadPool, + scheduledThreadPool, + clientProtocolManager); + } + + private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc) + { + if (tc.getParams() != null) + { + Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), tc.getParams().keySet()); + + if (!invalid.isEmpty()) + { + String msg = "The following keys are invalid for configuring a connector: " + + ConfigurationHelper.stringSetToCommaListString(invalid); + + throw new IllegalStateException(msg); + + } + } + } + + /** + * It will connect to either live or backup accordingly to the current configurations + * it will also switch to backup case it can't connect to live and there's a backup configured + * + * @return + */ + protected Connection createTransportConnection() + { + Connection transportConnection = null; + + try + { + if (ClientSessionFactoryImpl.isDebug) + { + HornetQClientLogger.LOGGER.debug("Trying to connect with connector = " + connectorFactory + + ", parameters = " + + connectorConfig.getParams() + + " connector = " + + connector); + } + + + Connector liveConnector = createConnector(connectorFactory, connectorConfig); + + if ((transportConnection = openTransportConnection(liveConnector)) != null) + { + // if we can't connect the connect method will return null, hence we have to try the backup + connector = liveConnector; + } + else if (backupConfig != null) + { + if (ClientSessionFactoryImpl.isDebug) + { + HornetQClientLogger.LOGGER.debug("Trying backup config = " + backupConfig); + } + + ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName()); + + Connector backupConnector = createConnector(backupConnectorFactory, backupConfig); + + transportConnection = openTransportConnection(backupConnector); + + if ((transportConnection = openTransportConnection(backupConnector)) != null) + { + /*looks like the backup is now live, let's use that*/ + + if (ClientSessionFactoryImpl.isDebug) + { + HornetQClientLogger.LOGGER.debug("Connected to the backup at " + backupConfig); + } + + // Switching backup as live + connector = backupConnector; + connectorConfig = backupConfig; + backupConfig = null; + connectorFactory = backupConnectorFactory; + } + else + { + if (ClientSessionFactoryImpl.isDebug) + { + HornetQClientLogger.LOGGER.debug("Backup is not active yet"); + } + } + + } + } + catch (Exception cause) + { + // Sanity catch for badly behaved remoting plugins + + HornetQClientLogger.LOGGER.createConnectorException(cause); + + if (transportConnection != null) + { + try + { + transportConnection.close(); + } + catch (Throwable t) + { + } + } + + if (connector != null) + { + try + { + connector.close(); + } + catch (Throwable t) + { + } + } + + transportConnection = null; + + connector = null; + } + + return transportConnection; + } + + private class DelegatingBufferHandler implements BufferHandler + { + public void bufferReceived(final Object connectionID, final HornetQBuffer buffer) + { + RemotingConnection theConn = connection; + + if (theConn != null && connectionID.equals(theConn.getID())) + { + theConn.bufferReceived(connectionID, buffer); + } + else + { + HornetQClientLogger.LOGGER.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet"); + } + } + } + + private final class DelegatingFailureListener implements FailureListener + { + private final Object connectionID; + + DelegatingFailureListener(final Object connectionID) + { + this.connectionID = connectionID; + } + + @Override + public void connectionFailed(final HornetQException me, final boolean failedOver) + { + connectionFailed(me, failedOver, null); + } + + @Override + public void connectionFailed(final HornetQException me, final boolean failedOver, String scaleDownTargetNodeID) + { + handleConnectionFailure(connectionID, me, scaleDownTargetNodeID); + } + + @Override + public String toString() + { + return DelegatingFailureListener.class.getSimpleName() + "('reconnectsOrFailover', hash=" + + super.hashCode() + ")"; + } + } + + private static final class ActualScheduledPinger implements Runnable + { + private final WeakReference<PingRunnable> pingRunnable; + + ActualScheduledPinger(final PingRunnable runnable) + { + pingRunnable = new WeakReference<PingRunnable>(runnable); + } + + public void run() + { + PingRunnable runnable = pingRunnable.get(); + + if (runnable != null) + { + runnable.run(); + } + } + + } + + private final class PingRunnable implements Runnable + { + private boolean cancelled; + + private boolean first; + + private long lastCheck = System.currentTimeMillis(); + + public synchronized void run() + { + if (cancelled || stopPingingAfterOne && !first) + { + return; + } + + first = false; + + long now = System.currentTimeMillis(); + + if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now >= lastCheck + connectionTTL) + { + if (!connection.checkDataReceived()) + { + + // We use a different thread to send the fail + // but the exception has to be created here to preserve the stack trace + final HornetQException me = HornetQClientMessageBundle.BUNDLE.connectionTimedOut(connection.getTransportConnection()); + + cancelled = true; + + threadPool.execute(new Runnable() + { + // Must be executed on different thread + public void run() + { + connection.fail(me); + } + }); + + return; + } + else + { + lastCheck = now; + } + } + + send(); + } + + /** + * + */ + public void send() + { + + clientProtocolManager.ping(connectionTTL); + } + + public synchronized void cancel() + { + cancelled = true; + } + } + + + protected RemotingConnection establishNewConnection() + { + Connection transportConnection = createTransportConnection(); + + if (transportConnection == null) + { + if (ClientSessionFactoryImpl.isTrace) + { + HornetQClientLogger.LOGGER.trace("Neither backup or live were active, will just give up now"); + } + return null; + } + + RemotingConnection newConnection = clientProtocolManager.connect(transportConnection, callTimeout, + callFailoverTimeout, incomingInterceptors, + outgoingInterceptors, + new SessionFactoryTopologyHandler()); + + newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID())); + + schedulePing(); + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("returning " + connection); + } + + return newConnection; + } + + + protected SessionContext createSessionChannel(final String name, + final String username, + final String password, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge) throws HornetQException + { + synchronized (createSessionLock) + { + return clientProtocolManager.createSessionContext(name, username, + password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, + serverLocator.getMinLargeMessageSize(), serverLocator.getConfirmationWindowSize()); + } + } + + @Override + public String getLiveNodeId() + { + return liveNodeID; + } + + class SessionFactoryTopologyHandler implements TopologyResponseHandler + { + + @Override + public void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID) + { + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("Disconnect being called on client:" + + " server locator = " + + serverLocator + + " notifying node " + + nodeID + + " as down", new Exception("trace")); + } + + serverLocator.notifyNodeDown(System.currentTimeMillis(), nodeID); + + closeExecutor.execute(new CloseRunnable(conn, scaleDownTargetNodeID)); + + } + + @Override + public void notifyNodeUp(long uniqueEventID, String nodeID, String backupGroupName, String scaleDownGroupName, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean isLast) + { + // if it is our connector then set the live id used for failover + if (connectorPair.getA() != null && connectorPair.getA().equals(connectorConfig)) + { + liveNodeID = nodeID; + } + serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast); + } + + @Override + public void notifyNodeDown(long eventTime, String nodeID) + { + serverLocator.notifyNodeDown(eventTime, nodeID); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryInternal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryInternal.java new file mode 100644 index 0000000..3ddfde5 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/ClientSessionFactoryInternal.java @@ -0,0 +1,63 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import java.util.concurrent.locks.Lock; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.SessionFailureListener; +import org.apache.activemq6.utils.ConfirmationWindowWarning; + +/** + * A ClientSessionFactoryInternal + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + * + */ +public interface ClientSessionFactoryInternal extends ClientSessionFactory +{ + void causeExit(); + + void addFailureListener(SessionFailureListener listener); + + boolean removeFailureListener(SessionFailureListener listener); + + void disableFinalizeCheck(); + + String getLiveNodeId(); + + // for testing + + int numConnections(); + + int numSessions(); + + void removeSession(final ClientSessionInternal session, boolean failingOver); + + void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws HornetQException; + + void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp); + + Object getConnector(); + + Object getBackupConnector(); + + void setReconnectAttempts(int i); + + ConfirmationWindowWarning getConfirmationWindowWarning(); + + Lock lockFailover(); +}
