http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/ChannelImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/ChannelImpl.java new file mode 100644 index 0000000..b186fe9 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/ChannelImpl.java @@ -0,0 +1,705 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl; + +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.Interceptor; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.protocol.core.Channel; +import org.apache.activemq6.core.protocol.core.ChannelHandler; +import org.apache.activemq6.core.protocol.core.CommandConfirmationHandler; +import org.apache.activemq6.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq6.core.protocol.core.Packet; +import org.apache.activemq6.core.protocol.core.impl.wireformat.HornetQExceptionMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + +/** + * A ChannelImpl + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public final class ChannelImpl implements Channel +{ + public enum CHANNEL_ID + { + /** + * Used for core protocol management. + */ + PING(0), + /** + * Session creation and attachment. + */ + SESSION(1), + /** + * Replication, i.e. for backups that do not share the journal. + */ + REPLICATION(2), + /** + * cluster used for controlling nodes in a cluster remotely + */ + CLUSTER(3), + /** + * Channels [0-9] are reserved for the system, user channels must be greater than that. + */ + USER(10); + + public final long id; + + CHANNEL_ID(long id) + { + this.id = id; + } + + protected static String idToString(long code) + { + for (CHANNEL_ID channel : EnumSet.allOf(CHANNEL_ID.class)) + { + if (channel.id == code) return channel.toString(); + } + return Long.toString(code); + } + } + + private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled(); + + private volatile long id; + + private ChannelHandler handler; + + private Packet response; + + private final java.util.Queue<Packet> resendCache; + + private volatile int firstStoredCommandID; + + private final AtomicInteger lastConfirmedCommandID = new AtomicInteger(-1); + + private volatile CoreRemotingConnection connection; + + private volatile boolean closed; + + private final Lock lock = new ReentrantLock(); + + private final Condition sendCondition = lock.newCondition(); + + private final Condition failoverCondition = lock.newCondition(); + + private final Object sendLock = new Object(); + + private final Object sendBlockingLock = new Object(); + + private boolean failingOver; + + private final int confWindowSize; + + private int receivedBytes; + + private CommandConfirmationHandler commandConfirmationHandler; + + private volatile boolean transferring; + + private final List<Interceptor> interceptors; + + public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize, final List<Interceptor> interceptors) + { + this.connection = connection; + + this.id = id; + + this.confWindowSize = confWindowSize; + + if (confWindowSize != -1) + { + resendCache = new ConcurrentLinkedQueue<Packet>(); + } + else + { + resendCache = null; + } + + this.interceptors = interceptors; + } + + public boolean supports(final byte packetType) + { + int version = connection.getClientVersion(); + + switch (packetType) + { + case PacketImpl.CLUSTER_TOPOLOGY_V2: + return version >= 122; + case PacketImpl.DISCONNECT_CONSUMER: + return version >= 124; + case PacketImpl.CLUSTER_TOPOLOGY_V3: + return version >= 125; + case PacketImpl.DISCONNECT_V2: + return version >= 125; + default: + return true; + } + } + + public long getID() + { + return id; + } + + public int getLastConfirmedCommandID() + { + return lastConfirmedCommandID.get(); + } + + public Lock getLock() + { + return lock; + } + + public int getConfirmationWindowSize() + { + return confWindowSize; + } + + public void returnBlocking() + { + returnBlocking(null); + } + + public void returnBlocking(Throwable cause) + { + lock.lock(); + + try + { + response = new HornetQExceptionMessage(HornetQClientMessageBundle.BUNDLE.unblockingACall(cause)); + + sendCondition.signal(); + } + finally + { + lock.unlock(); + } + } + + public boolean sendAndFlush(final Packet packet) + { + return send(packet, true, false); + } + + public boolean send(final Packet packet) + { + return send(packet, false, false); + } + + public boolean sendBatched(final Packet packet) + { + return send(packet, false, true); + } + + public void setTransferring(boolean transferring) + { + this.transferring = transferring; + } + + // This must never called by more than one thread concurrently + public boolean send(final Packet packet, final boolean flush, final boolean batch) + { + if (invokeInterceptors(packet, interceptors, connection) != null) + { + return false; + } + + synchronized (sendLock) + { + packet.setChannelID(id); + + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Sending packet nonblocking " + packet + " on channeID=" + id); + } + + HornetQBuffer buffer = packet.encode(connection); + + lock.lock(); + + try + { + if (failingOver) + { + // TODO - don't hardcode this timeout + try + { + failoverCondition.await(10000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + + // Sanity check + if (transferring) + { + throw new IllegalStateException("Cannot send a packet while channel is doing failover"); + } + + if (resendCache != null && packet.isRequiresConfirmations()) + { + resendCache.add(packet); + } + } + finally + { + lock.unlock(); + } + + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id); + } + + + // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp + // buffer is full, preventing any incoming buffers being handled and blocking failover + connection.getTransportConnection().write(buffer, flush, batch); + + return true; + } + } + + /** + * Due to networking issues or server issues the server may take longer to answer than expected.. the client may timeout the call throwing an exception + * and the client could eventually retry another call, but the server could then answer a previous command issuing a class-cast-exception. + * The expectedPacket will be used to filter out undesirable packets that would belong to previous calls. + */ + public Packet sendBlocking(final Packet packet, byte expectedPacket) throws HornetQException + { + String interceptionResult = invokeInterceptors(packet, interceptors, connection); + + if (interceptionResult != null) + { + // if we don't throw an exception here the client might not unblock + throw HornetQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult); + } + + if (closed) + { + throw HornetQClientMessageBundle.BUNDLE.connectionDestroyed(); + } + + if (connection.getBlockingCallTimeout() == -1) + { + throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection"); + } + + // Synchronized since can't be called concurrently by more than one thread and this can occur + // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread + synchronized (sendBlockingLock) + { + packet.setChannelID(id); + + final HornetQBuffer buffer = packet.encode(connection); + + lock.lock(); + + try + { + if (failingOver) + { + try + { + if (connection.getBlockingCallFailoverTimeout() < 0) + { + while (failingOver) + { + failoverCondition.await(); + } + } + else + { + if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) + { + HornetQClientLogger.LOGGER.debug("timed-out waiting for failover condition"); + } + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + + response = null; + + if (resendCache != null && packet.isRequiresConfirmations()) + { + resendCache.add(packet); + } + + connection.getTransportConnection().write(buffer, false, false); + + long toWait = connection.getBlockingCallTimeout(); + + long start = System.currentTimeMillis(); + + while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && + response.getType() != expectedPacket)) && toWait > 0) + { + try + { + sendCondition.await(toWait, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + + if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) + { + HornetQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace")); + } + + if (closed) + { + break; + } + + final long now = System.currentTimeMillis(); + + toWait -= now - start; + + start = now; + } + + if (response == null) + { + throw HornetQClientMessageBundle.BUNDLE.timedOutSendingPacket(packet.getType()); + } + + if (response.getType() == PacketImpl.EXCEPTION) + { + final HornetQExceptionMessage mem = (HornetQExceptionMessage) response; + + HornetQException e = mem.getException(); + + e.fillInStackTrace(); + + throw e; + } + } + finally + { + lock.unlock(); + } + + return response; + } + } + + /** + * @param packet the packet to intercept + * @return the name of the interceptor that returned <code>false</code> or <code>null</code> if no interceptors + * returned <code>false</code>. + */ + public static String invokeInterceptors(final Packet packet, final List<Interceptor> interceptors, final RemotingConnection connection) + { + if (interceptors != null) + { + for (final Interceptor interceptor : interceptors) + { + try + { + boolean callNext = interceptor.intercept(packet, connection); + + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + // use a StringBuilder for speed since this may be executed a lot + StringBuilder msg = new StringBuilder(); + msg.append("Invocation of interceptor ").append(interceptor.getClass().getName()).append(" on "). + append(packet).append(" returned ").append(callNext); + HornetQClientLogger.LOGGER.debug(msg.toString()); + } + + if (!callNext) + { + return interceptor.getClass().getName(); + } + } + catch (final Throwable e) + { + HornetQClientLogger.LOGGER.errorCallingInterceptor(e, interceptor); + } + } + } + + return null; + } + + public void setCommandConfirmationHandler(final CommandConfirmationHandler handler) + { + if (confWindowSize < 0) + { + final String msg = + "You can't set confirmationHandler on a connection with confirmation-window-size < 0." + + " Look at the documentation for more information."; + throw new IllegalStateException(msg); + } + commandConfirmationHandler = handler; + } + + public void setHandler(final ChannelHandler handler) + { + this.handler = handler; + } + + public ChannelHandler getHandler() + { + return handler; + } + + public void close() + { + if (closed) + { + return; + } + + if (!connection.isDestroyed() && !connection.removeChannel(id)) + { + throw HornetQClientMessageBundle.BUNDLE.noChannelToClose(id); + } + + if (failingOver) + { + unlock(); + } + closed = true; + } + + public void transferConnection(final CoreRemotingConnection newConnection) + { + // Needs to synchronize on the connection to make sure no packets from + // the old connection get processed after transfer has occurred + synchronized (connection.getTransferLock()) + { + connection.removeChannel(id); + + // And switch it + + final CoreRemotingConnection rnewConnection = newConnection; + + rnewConnection.putChannel(id, this); + + connection = rnewConnection; + + transferring = true; + } + } + + public void replayCommands(final int otherLastConfirmedCommandID) + { + if (resendCache != null) + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Replaying commands on channelID=" + id); + } + clearUpTo(otherLastConfirmedCommandID); + + for (final Packet packet : resendCache) + { + doWrite(packet); + } + } + } + + public void lock() + { + lock.lock(); + + failingOver = true; + + lock.unlock(); + } + + public void unlock() + { + lock.lock(); + + failingOver = false; + + failoverCondition.signalAll(); + + lock.unlock(); + } + + public CoreRemotingConnection getConnection() + { + return connection; + } + + // Needs to be synchronized since can be called by remoting service timer thread too for timeout flush + public synchronized void flushConfirmations() + { + if (resendCache != null && receivedBytes != 0) + { + receivedBytes = 0; + + final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID.get()); + + confirmed.setChannelID(id); + + doWrite(confirmed); + } + } + + public void confirm(final Packet packet) + { + if (resendCache != null && packet.isRequiresConfirmations()) + { + lastConfirmedCommandID.incrementAndGet(); + + receivedBytes += packet.getPacketSize(); + + if (receivedBytes >= confWindowSize) + { + receivedBytes = 0; + + final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID.get()); + + confirmed.setChannelID(id); + + doWrite(confirmed); + } + } + } + + public void clearCommands() + { + if (resendCache != null) + { + lastConfirmedCommandID.set(-1); + + firstStoredCommandID = 0; + + resendCache.clear(); + } + } + + public void handlePacket(final Packet packet) + { + if (packet.getType() == PacketImpl.PACKETS_CONFIRMED) + { + if (resendCache != null) + { + final PacketsConfirmedMessage msg = (PacketsConfirmedMessage) packet; + + clearUpTo(msg.getCommandID()); + } + + if (!connection.isClient()) + { + handler.handlePacket(packet); + } + + return; + } + else + { + if (packet.isResponse()) + { + confirm(packet); + + lock.lock(); + + try + { + response = packet; + sendCondition.signal(); + } + finally + { + lock.unlock(); + } + } + else if (handler != null) + { + handler.handlePacket(packet); + } + } + } + + private void doWrite(final Packet packet) + { + final HornetQBuffer buffer = packet.encode(connection); + + connection.getTransportConnection().write(buffer, false, false); + } + + private void clearUpTo(final int lastReceivedCommandID) + { + final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID; + + if (numberToClear == -1) + { + throw HornetQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID); + } + + int sizeToFree = 0; + + for (int i = 0; i < numberToClear; i++) + { + final Packet packet = resendCache.poll(); + + if (packet == null) + { + if (lastReceivedCommandID > 0) + { + HornetQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); + } + firstStoredCommandID = lastReceivedCommandID + 1; + return; + } + + if (packet.getType() != PacketImpl.PACKETS_CONFIRMED) + { + sizeToFree += packet.getPacketSize(); + } + + if (commandConfirmationHandler != null) + { + commandConfirmationHandler.commandConfirmed(packet); + } + } + + firstStoredCommandID += numberToClear; + } + + @Override + public String toString() + { + return "Channel[id=" + CHANNEL_ID.idToString(id) + ", handler=" + handler + "]"; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManager.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManager.java new file mode 100644 index 0000000..19b48a8 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManager.java @@ -0,0 +1,613 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.core.impl; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +import io.netty.channel.ChannelPipeline; +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.Interceptor; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq6.core.protocol.ClientPacketDecoder; +import org.apache.activemq6.core.protocol.core.Channel; +import org.apache.activemq6.core.protocol.core.ChannelHandler; +import org.apache.activemq6.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq6.core.protocol.core.Packet; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CheckFailoverMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage_V2; +import org.apache.activemq6.core.protocol.core.impl.wireformat.Ping; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; +import org.apache.activemq6.core.remoting.impl.netty.HornetQFrameDecoder2; +import org.apache.activemq6.core.version.Version; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.apache.activemq6.spi.core.remoting.TopologyResponseHandler; +import org.apache.activemq6.spi.core.remoting.SessionContext; +import org.apache.activemq6.utils.VersionLoader; + +/** + * This class will return specific packets for different types of actions happening on a messaging protocol. + * <p/> + * This is trying to unify the Core client into multiple protocols. + * <p/> + * Returning null in certain packets means no action is taken on this specific protocol. + * <p/> + * Semantic properties could also be added to this implementation. + * <p/> + * Implementations of this class need to be stateless. + * + * @author Clebert Suconic + */ + +public class HornetQClientProtocolManager implements ClientProtocolManager +{ + private final int versionID = VersionLoader.getVersion().getIncrementingVersion(); + + private ClientSessionFactoryInternal factoryInternal; + + /** + * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch} + */ + private final Object inCreateSessionGuard = new Object(); + + /** + * Flag that tells whether we are trying to create a session. + */ + private boolean inCreateSession; + + /** + * Used to wait for the creation of a session. + */ + private CountDownLatch inCreateSessionLatch; + + protected volatile RemotingConnectionImpl connection; + + protected TopologyResponseHandler topologyResponseHandler; + + /** + * Flag that signals that the communication is closing. Causes many processes to exit. + */ + private volatile boolean alive = true; + + private final CountDownLatch waitLatch = new CountDownLatch(1); + + + public HornetQClientProtocolManager() + { + } + + public String getName() + { + return HornetQClient.DEFAULT_CORE_PROTOCOL; + } + + public void setSessionFactory(ClientSessionFactory factory) + { + this.factoryInternal = (ClientSessionFactoryInternal)factory; + } + + public ClientSessionFactory getSessionFactory() + { + return this.factoryInternal; + } + + @Override + public void addChannelHandlers(ChannelPipeline pipeline) + { + pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder2()); + } + + public boolean waitOnLatch(long milliseconds) throws InterruptedException + { + return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS); + } + + public Channel getChannel0() + { + if (connection == null) + { + return null; + } + else + { + return connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); + } + } + + public RemotingConnection getCurrentConnection() + { + return connection; + } + + + public Channel getChannel1() + { + if (connection == null) + { + return null; + } + else + { + return connection.getChannel(1, -1); + } + } + + public Lock lockSessionCreation() + { + try + { + Lock localFailoverLock = factoryInternal.lockFailover(); + try + { + if (connection == null) + { + return null; + } + + Lock lock = getChannel1().getLock(); + + // Lock it - this must be done while the failoverLock is held + while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS)) + { + } + + return lock; + } + finally + { + localFailoverLock.unlock(); + } + // We can now release the failoverLock + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return null; + } + } + + + public void stop() + { + alive = false; + + + synchronized (inCreateSessionGuard) + { + if (inCreateSessionLatch != null) + inCreateSessionLatch.countDown(); + } + + + Channel channel1 = getChannel1(); + if (channel1 != null) + { + channel1.returnBlocking(); + } + + waitLatch.countDown(); + + } + + public boolean isAlive() + { + return alive; + } + + + @Override + public void ping(long connectionTTL) + { + Channel channel = connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); + + Ping ping = new Ping(connectionTTL); + + channel.send(ping); + + connection.flush(); + } + + @Override + public void sendSubscribeTopology(final boolean isServer) + { + getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, + VersionLoader.getVersion() + .getIncrementingVersion())); + } + + @Override + public SessionContext createSessionContext(String name, String username, String password, + boolean xa, boolean autoCommitSends, boolean autoCommitAcks, + boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws HornetQException + { + for (Version clientVersion : VersionLoader.getClientVersions()) + { + try + { + return createSessionContext(clientVersion, + name, + username, + password, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + minLargeMessageSize, + confirmationWindowSize); + } + catch (HornetQException e) + { + if (e.getType() != HornetQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) + { + throw e; + } + } + } + connection.destroy(); + throw new HornetQException(HornetQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS); + } + + public SessionContext createSessionContext(Version clientVersion, String name, String username, String password, + boolean xa, boolean autoCommitSends, boolean autoCommitAcks, + boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws HornetQException + { + if (!isAlive()) + throw HornetQClientMessageBundle.BUNDLE.clientSessionClosed(); + + Channel sessionChannel = null; + CreateSessionResponseMessage response = null; + + boolean retry; + do + { + retry = false; + + Lock lock = null; + + try + { + + lock = lockSessionCreation(); + + // We now set a flag saying createSession is executing + synchronized (inCreateSessionGuard) + { + if (!isAlive()) + throw HornetQClientMessageBundle.BUNDLE.clientSessionClosed(); + inCreateSession = true; + inCreateSessionLatch = new CountDownLatch(1); + } + + long sessionChannelID = connection.generateChannelID(); + + Packet request = new CreateSessionMessage(name, + sessionChannelID, + clientVersion.getIncrementingVersion(), + username, + password, + minLargeMessageSize, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + confirmationWindowSize, + null); + + + try + { + // channel1 reference here has to go away + response = (CreateSessionResponseMessage) getChannel1().sendBlocking(request, PacketImpl.CREATESESSION_RESP); + } + catch (HornetQException cause) + { + if (!isAlive()) + throw cause; + + if (cause.getType() == HornetQExceptionType.UNBLOCKED) + { + // This means the thread was blocked on create session and failover unblocked it + // so failover could occur + + retry = true; + + continue; + } + else + { + throw cause; + } + } + + sessionChannel = connection.getChannel(sessionChannelID, confirmationWindowSize); + + + } + catch (Throwable t) + { + if (lock != null) + { + lock.unlock(); + lock = null; + } + + if (t instanceof HornetQException) + { + throw (HornetQException) t; + } + else + { + throw HornetQClientMessageBundle.BUNDLE.failedToCreateSession(t); + } + } + finally + { + if (lock != null) + { + lock.unlock(); + } + + // Execution has finished so notify any failover thread that may be waiting for us to be done + inCreateSession = false; + inCreateSessionLatch.countDown(); + } + } + while (retry); + + + // these objects won't be null, otherwise it would keep retrying on the previous loop + return new HornetQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); + + } + + public boolean cleanupBeforeFailover(HornetQException cause) + { + + boolean needToInterrupt; + + CountDownLatch exitLockLatch; + Lock lock = lockSessionCreation(); + + if (lock == null) + { + return false; + } + + try + { + synchronized (inCreateSessionGuard) + { + needToInterrupt = inCreateSession; + exitLockLatch = inCreateSessionLatch; + } + } + finally + { + lock.unlock(); + } + + if (needToInterrupt) + { + forceReturnChannel1(cause); + + // Now we need to make sure that the thread has actually exited and returned it's + // connections + // before failover occurs + + while (inCreateSession && isAlive()) + { + try + { + if (exitLockLatch != null) + { + exitLockLatch.await(500, TimeUnit.MILLISECONDS); + } + } + catch (InterruptedException e1) + { + throw new HornetQInterruptedException(e1); + } + } + } + + return true; + } + + @Override + public boolean checkForFailover(String liveNodeID) throws HornetQException + { + CheckFailoverMessage packet = new CheckFailoverMessage(liveNodeID); + CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet, + PacketImpl.CHECK_FOR_FAILOVER_REPLY); + return message.isOkToFailover(); + } + + + public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, + List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, + TopologyResponseHandler topologyResponseHandler) + { + this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, + callTimeout, callFailoverTimeout, + incomingInterceptors, outgoingInterceptors); + + this.topologyResponseHandler = topologyResponseHandler; + + getChannel0().setHandler(new Channel0Handler(connection)); + + + sendHandshake(transportConnection); + + return connection; + } + + private void sendHandshake(Connection transportConnection) + { + if (transportConnection.isUsingProtocolHandling()) + { + // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling + String handshake = "HORNETQ"; + HornetQBuffer hqbuffer = connection.createBuffer(handshake.length()); + hqbuffer.writeBytes(handshake.getBytes()); + transportConnection.write(hqbuffer); + } + } + + + private class Channel0Handler implements ChannelHandler + { + private final CoreRemotingConnection conn; + + private Channel0Handler(final CoreRemotingConnection conn) + { + this.conn = conn; + } + + public void handlePacket(final Packet packet) + { + final byte type = packet.getType(); + + if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2) + { + final DisconnectMessage msg = (DisconnectMessage) packet; + String scaleDownTargetNodeID = null; + + SimpleString nodeID = msg.getNodeID(); + + if (packet instanceof DisconnectMessage_V2) + { + final DisconnectMessage_V2 msg_v2 = (DisconnectMessage_V2) packet; + scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString(); + } + + if (topologyResponseHandler != null) + topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); + } + else if (type == PacketImpl.CLUSTER_TOPOLOGY) + { + ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; + notifyTopologyChange(topMessage); + } + else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) + { + ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet; + notifyTopologyChange(topMessage); + } + else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) + { + ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; + notifyTopologyChange(topMessage); + } + else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) + { + System.out.println("Channel0Handler.handlePacket"); + } + } + + /** + * @param topMessage + */ + private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) + { + final long eventUID; + final String backupGroupName; + final String scaleDownGroupName; + if (topMessage instanceof ClusterTopologyChangeMessage_V3) + { + eventUID = ((ClusterTopologyChangeMessage_V3) topMessage).getUniqueEventID(); + backupGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getBackupGroupName(); + scaleDownGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getScaleDownGroupName(); + } + else if (topMessage instanceof ClusterTopologyChangeMessage_V2) + { + eventUID = ((ClusterTopologyChangeMessage_V2) topMessage).getUniqueEventID(); + backupGroupName = ((ClusterTopologyChangeMessage_V2) topMessage).getBackupGroupName(); + scaleDownGroupName = null; + } + else + { + eventUID = System.currentTimeMillis(); + backupGroupName = null; + scaleDownGroupName = null; + } + + if (topMessage.isExit()) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down"); + } + + if (topologyResponseHandler != null) + { + topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID()); + } + } + else + { + Pair<TransportConfiguration, TransportConfiguration> transportConfig = topMessage.getPair(); + if (transportConfig.getA() == null && transportConfig.getB() == null) + { + transportConfig = new Pair<>(conn.getTransportConnection() + .getConnectorConfig(), + null); + } + + if (topologyResponseHandler != null) + { + topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast()); + } + } + } + } + + protected PacketDecoder getPacketDecoder() + { + return ClientPacketDecoder.INSTANCE; + } + + private void forceReturnChannel1(HornetQException cause) + { + if (connection != null) + { + Channel channel1 = connection.getChannel(1, -1); + + if (channel1 != null) + { + channel1.returnBlocking(cause); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java new file mode 100644 index 0000000..09b91c3 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java @@ -0,0 +1,42 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.core.impl; + +import org.apache.activemq6.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq6.spi.core.remoting.ClientProtocolManagerFactory; + +/** + * @author Clebert Suconic + */ + +public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory +{ + private static final long serialVersionUID = 1; + + private static final HornetQClientProtocolManagerFactory INSTANCE = new HornetQClientProtocolManagerFactory(); + + private HornetQClientProtocolManagerFactory() + { + } + + public static final HornetQClientProtocolManagerFactory getInstance() + { + return INSTANCE; + } + + public ClientProtocolManager newProtocolManager() + { + return new HornetQClientProtocolManager(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQConsumerContext.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQConsumerContext.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQConsumerContext.java new file mode 100644 index 0000000..c4967f1 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQConsumerContext.java @@ -0,0 +1,54 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.core.impl; + +import org.apache.activemq6.spi.core.remoting.ConsumerContext; + +/** + * @author Clebert Suconic + */ + +public class HornetQConsumerContext extends ConsumerContext +{ + private long id; + + public HornetQConsumerContext(long id) + { + this.id = id; + } + + public long getId() + { + return id; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HornetQConsumerContext that = (HornetQConsumerContext) o; + + if (id != that.id) return false; + + return true; + } + + @Override + public int hashCode() + { + return (int) (id ^ (id >>> 32)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQSessionContext.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQSessionContext.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQSessionContext.java new file mode 100644 index 0000000..2ec884d --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/HornetQSessionContext.java @@ -0,0 +1,940 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.core.impl; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientConsumer; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.client.impl.AddressQueryImpl; +import org.apache.activemq6.core.client.impl.ClientConsumerImpl; +import org.apache.activemq6.core.client.impl.ClientConsumerInternal; +import org.apache.activemq6.core.client.impl.ClientLargeMessageInternal; +import org.apache.activemq6.core.client.impl.ClientMessageInternal; +import org.apache.activemq6.core.client.impl.ClientProducerCreditsImpl; +import org.apache.activemq6.core.client.impl.ClientSessionImpl; +import org.apache.activemq6.core.message.impl.MessageInternal; +import org.apache.activemq6.core.protocol.core.Channel; +import org.apache.activemq6.core.protocol.core.ChannelHandler; +import org.apache.activemq6.core.protocol.core.CommandConfirmationHandler; +import org.apache.activemq6.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq6.core.protocol.core.Packet; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateQueueMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.HornetQExceptionMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ReattachSessionMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.RollbackMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCloseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionExpireMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendLargeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXACommitMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAEndMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAForgetMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAJoinMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAResumeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXARollbackMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAStartMessage; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.apache.activemq6.spi.core.remoting.SessionContext; +import org.apache.activemq6.utils.TokenBucketLimiterImpl; + +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.EXCEPTION; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; + +/** + * @author Clebert Suconic + */ + +public class HornetQSessionContext extends SessionContext +{ + private final Channel sessionChannel; + private final int serverVersion; + private int confirmationWindow; + private final String name; + + + public HornetQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, int serverVersion, int confirmationWindow) + { + super(remotingConnection); + + this.name = name; + this.sessionChannel = sessionChannel; + this.serverVersion = serverVersion; + this.confirmationWindow = confirmationWindow; + + ChannelHandler handler = new ClientSessionPacketHandler(); + sessionChannel.setHandler(handler); + + + if (confirmationWindow >= 0) + { + sessionChannel.setCommandConfirmationHandler(confirmationHandler); + } + } + + + private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() + { + public void commandConfirmed(final Packet packet) + { + if (packet.getType() == PacketImpl.SESS_SEND) + { + SessionSendMessage ssm = (SessionSendMessage) packet; + callSendAck(ssm.getHandler(), ssm.getMessage()); + } + else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) + { + SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet; + if (!scm.isContinues()) + { + callSendAck(scm.getHandler(), scm.getMessage()); + } + } + } + + private void callSendAck(SendAcknowledgementHandler handler, final Message message) + { + if (handler != null) + { + handler.sendAcknowledged(message); + } + else if (sendAckHandler != null) + { + sendAckHandler.sendAcknowledged(message); + } + } + + }; + + + // Failover utility methods + + @Override + public void returnBlocking(HornetQException cause) + { + sessionChannel.returnBlocking(cause); + } + + @Override + public void lockCommunications() + { + sessionChannel.lock(); + } + + @Override + public void releaseCommunications() + { + sessionChannel.setTransferring(false); + sessionChannel.unlock(); + } + + public void cleanup() + { + sessionChannel.close(); + + // if the server is sending a disconnect + // any pending blocked operation could hang without this + sessionChannel.returnBlocking(); + } + + @Override + public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) + { + // nothing to be done here... Flow control here is done on the core side + } + + + public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) + { + sessionChannel.setCommandConfirmationHandler(confirmationHandler); + this.sendAckHandler = handler; + } + + public void createSharedQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable) throws HornetQException + { + sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE); + } + + public void deleteQueue(final SimpleString queueName) throws HornetQException + { + sessionChannel.sendBlocking(new SessionDeleteQueueMessage(queueName), PacketImpl.NULL_RESPONSE); + } + + public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws HornetQException + { + SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); + SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + return response.toQueueQuery(); + + } + + + public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, + int windowSize, int maxRate, int ackBatchSize, boolean browseOnly, + Executor executor, Executor flowControlExecutor) throws HornetQException + { + long consumerID = idGenerator.generateID(); + + HornetQConsumerContext consumerContext = new HornetQConsumerContext(consumerID); + + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, + queueName, + filterString, + browseOnly, + true); + + SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + // The actual windows size that gets used is determined by the user since + // could be overridden on the queue settings + // The value we send is just a hint + + return new ClientConsumerImpl(session, + consumerContext, + queueName, + filterString, + browseOnly, + calcWindowSize(windowSize), + ackBatchSize, + maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, + false) + : null, + executor, + flowControlExecutor, + this, + queueInfo.toQueueQuery(), + lookupTCCL()); + } + + + public int getServerVersion() + { + return serverVersion; + } + + public ClientSession.AddressQuery addressQuery(final SimpleString address) throws HornetQException + { + SessionBindingQueryResponseMessage response = + (SessionBindingQueryResponseMessage) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); + + return new AddressQueryImpl(response.isExists(), response.getQueueNames()); + } + + + @Override + public void closeConsumer(final ClientConsumer consumer) throws HornetQException + { + sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE); + } + + public void sendConsumerCredits(final ClientConsumer consumer, final int credits) + { + sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits)); + } + + public void forceDelivery(final ClientConsumer consumer, final long sequence) throws HornetQException + { + SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence); + sessionChannel.send(request); + } + + public void simpleCommit() throws HornetQException + { + sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE); + } + + public void simpleRollback(boolean lastMessageAsDelivered) throws HornetQException + { + sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE); + } + + public void sessionStart() throws HornetQException + { + sessionChannel.send(new PacketImpl(PacketImpl.SESS_START)); + } + + public void sessionStop() throws HornetQException + { + sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP), PacketImpl.NULL_RESPONSE); + } + + public void addSessionMetadata(String key, String data) throws HornetQException + { + sessionChannel.sendBlocking(new SessionAddMetaDataMessageV2(key, data), PacketImpl.NULL_RESPONSE); + } + + + public void addUniqueMetaData(String key, String data) throws HornetQException + { + sessionChannel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data), PacketImpl.NULL_RESPONSE); + } + + public void xaCommit(Xid xid, boolean onePhase) throws XAException, HornetQException + { + SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase); + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response); + } + } + + public void xaEnd(Xid xid, int flags) throws XAException, HornetQException + { + Packet packet; + if (flags == XAResource.TMSUSPEND) + { + packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND); + } + else if (flags == XAResource.TMSUCCESS) + { + packet = new SessionXAEndMessage(xid, false); + } + else if (flags == XAResource.TMFAIL) + { + packet = new SessionXAEndMessage(xid, true); + } + else + { + throw new XAException(XAException.XAER_INVAL); + } + + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + } + + + public void sendProducerCreditsMessage(final int credits, final SimpleString address) + { + sessionChannel.send(new SessionRequestProducerCreditsMessage(credits, address)); + } + + /** + * HornetQ does support large messages + * + * @return + */ + public boolean supportsLargeMessage() + { + return true; + } + + @Override + public int getCreditsOnSendingFull(MessageInternal msgI) + { + return msgI.getEncodeSize(); + } + + public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws HornetQException + { + SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler); + + if (sendBlocking) + { + sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.sendBatched(packet); + } + } + + @Override + public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws HornetQException + { + SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI); + + sessionChannel.send(initialChunk); + + return msgI.getHeadersAndPropertiesEncodeSize(); + } + + @Override + public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws HornetQException + { + final boolean requiresResponse = lastChunk && sendBlocking; + final SessionSendContinuationMessage chunkPacket = + new SessionSendContinuationMessage(msgI, chunk, !lastChunk, + requiresResponse, messageBodySize, messageHandler); + + if (requiresResponse) + { + // When sending it blocking, only the last chunk will be blocking. + sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.send(chunkPacket); + } + + return chunkPacket.getPacketSize(); + } + + public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws HornetQException + { + PacketImpl messagePacket; + if (individual) + { + messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); + } + else + { + messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); + } + + if (block) + { + sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.sendBatched(messagePacket); + } + } + + public void expireMessage(final ClientConsumer consumer, Message message) throws HornetQException + { + SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID()); + + sessionChannel.send(messagePacket); + } + + + public void sessionClose() throws HornetQException + { + sessionChannel.sendBlocking(new SessionCloseMessage(), PacketImpl.NULL_RESPONSE); + } + + public void xaForget(Xid xid) throws XAException, HornetQException + { + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(new SessionXAForgetMessage(xid), PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + } + + public int xaPrepare(Xid xid) throws XAException, HornetQException + { + SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid); + + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + else + { + return response.getResponseCode(); + } + } + + public Xid[] xaScan() throws HornetQException + { + SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS), PacketImpl.SESS_XA_INDOUBT_XIDS_RESP); + + List<Xid> xids = response.getXids(); + + Xid[] xidArray = xids.toArray(new Xid[xids.size()]); + + return xidArray; + } + + public void xaRollback(Xid xid, boolean wasStarted) throws HornetQException, XAException + { + SessionXARollbackMessage packet = new SessionXARollbackMessage(xid); + + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + } + + public void xaStart(Xid xid, int flags) throws XAException, HornetQException + { + Packet packet; + if (flags == XAResource.TMJOIN) + { + packet = new SessionXAJoinMessage(xid); + } + else if (flags == XAResource.TMRESUME) + { + packet = new SessionXAResumeMessage(xid); + } + else if (flags == XAResource.TMNOFLAGS) + { + // Don't need to flush since the previous end will have done this + packet = new SessionXAStartMessage(xid); + } + else + { + throw new XAException(XAException.XAER_INVAL); + } + + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + HornetQClientLogger.LOGGER.errorCallingStart(response.getMessage(), response.getResponseCode()); + throw new XAException(response.getResponseCode()); + } + } + + public boolean configureTransactionTimeout(int seconds) throws HornetQException + { + SessionXASetTimeoutResponseMessage response = (SessionXASetTimeoutResponseMessage) sessionChannel.sendBlocking(new SessionXASetTimeoutMessage(seconds), PacketImpl.SESS_XA_SET_TIMEOUT_RESP); + + return response.isOK(); + } + + public int recoverSessionTimeout() throws HornetQException + { + SessionXAGetTimeoutResponseMessage response = (SessionXAGetTimeoutResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT), PacketImpl.SESS_XA_GET_TIMEOUT_RESP); + + return response.getTimeoutSeconds(); + } + + public void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp) throws HornetQException + { + CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true); + sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); + } + + @Override + public boolean reattachOnNewConnection(RemotingConnection newConnection) throws HornetQException + { + + this.remotingConnection = newConnection; + + sessionChannel.transferConnection((CoreRemotingConnection) newConnection); + + Packet request = new ReattachSessionMessage(name, sessionChannel.getLastConfirmedCommandID()); + + Channel channel1 = getCoreConnection().getChannel(1, -1); + + ReattachSessionResponseMessage response = (ReattachSessionResponseMessage) channel1.sendBlocking(request, PacketImpl.REATTACH_SESSION_RESP); + + if (response.isReattached()) + { + if (HornetQClientLogger.LOGGER.isDebugEnabled()) + { + HornetQClientLogger.LOGGER.debug("ClientSession reattached fine, replaying commands"); + } + // The session was found on the server - we reattached transparently ok + + sessionChannel.replayCommands(response.getLastConfirmedCommandID()); + + return true; + } + else + { + + sessionChannel.clearCommands(); + + return false; + } + + } + + public void recreateSession(final String username, + final String password, + final int minLargeMessageSize, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final SimpleString defaultAddress) throws HornetQException + { + Packet createRequest = new CreateSessionMessage(name, + sessionChannel.getID(), + getServerVersion(), + username, + password, + minLargeMessageSize, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + confirmationWindow, + defaultAddress == null ? null + : defaultAddress.toString()); + boolean retry; + do + { + try + { + getCreateChannel().sendBlocking(createRequest, PacketImpl.CREATESESSION_RESP); + retry = false; + } + catch (HornetQException e) + { + // the session was created while its server was starting, retry it: + if (e.getType() == HornetQExceptionType.SESSION_CREATION_REJECTED) + { + HornetQClientLogger.LOGGER.retryCreateSessionSeverStarting(name); + retry = true; + // sleep a little bit to avoid spinning too much + try + { + Thread.sleep(10); + } + catch (InterruptedException ie) + { + Thread.currentThread().interrupt(); + throw e; + } + } + else + { + throw e; + } + } + } + while (retry && !session.isClosing()); + } + + @Override + public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws HornetQException + { + ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo(); + + // We try and recreate any non durable queues, since they probably won't be there unless + // they are defined in hornetq-configuration.xml + // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover + if (!queueInfo.isDurable()) + { + CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), + queueInfo.getName(), + queueInfo.getFilterString(), + false, + queueInfo.isTemporary(), + false); + + sendPacketWithoutLock(sessionChannel, createQueueRequest); + } + + SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), + consumerInternal.getQueueName(), + consumerInternal.getFilterString(), + consumerInternal.isBrowseOnly(), + false); + + sendPacketWithoutLock(sessionChannel, createConsumerRequest); + + int clientWindowSize = consumerInternal.getClientWindowSize(); + + if (clientWindowSize != 0) + { + SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), + clientWindowSize); + + sendPacketWithoutLock(sessionChannel, packet); + } + else + { + // https://jira.jboss.org/browse/HORNETQ-522 + SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), + 1); + sendPacketWithoutLock(sessionChannel, packet); + } + } + + public void xaFailed(Xid xid) throws HornetQException + { + sendPacketWithoutLock(sessionChannel, new SessionXAAfterFailedMessage(xid)); + } + + public void restartSession() throws HornetQException + { + sendPacketWithoutLock(sessionChannel, new PacketImpl(PacketImpl.SESS_START)); + } + + @Override + public void resetMetadata(HashMap<String, String> metaDataToSend) + { + // Resetting the metadata after failover + for (Map.Entry<String, String> entries : metaDataToSend.entrySet()) + { + sendPacketWithoutLock(sessionChannel, new SessionAddMetaDataMessageV2(entries.getKey(), entries.getValue(), false)); + } + } + + + private Channel getCreateChannel() + { + return getCoreConnection().getChannel(1, -1); + } + + private CoreRemotingConnection getCoreConnection() + { + return (CoreRemotingConnection) remotingConnection; + } + + + /** + * This doesn't apply to other protocols probably, so it will be a hornetq exclusive feature + * + * @throws HornetQException + */ + private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws HornetQException + { + DisconnectConsumerMessage message = packet; + + session.handleConsumerDisconnect(new HornetQConsumerContext(message.getConsumerId())); + } + + private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception + { + ClientMessageInternal msgi = (ClientMessageInternal) messagePacket.getMessage(); + + msgi.setDeliveryCount(messagePacket.getDeliveryCount()); + + msgi.setFlowControlSize(messagePacket.getPacketSize()); + + handleReceiveMessage(new HornetQConsumerContext(messagePacket.getConsumerID()), msgi); + } + + private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception + { + ClientLargeMessageInternal clientLargeMessage = (ClientLargeMessageInternal) serverPacket.getLargeMessage(); + + clientLargeMessage.setFlowControlSize(serverPacket.getPacketSize()); + + clientLargeMessage.setDeliveryCount(serverPacket.getDeliveryCount()); + + handleReceiveLargeMessage(new HornetQConsumerContext(serverPacket.getConsumerID()), clientLargeMessage, serverPacket.getLargeMessageSize()); + } + + + private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception + { + handleReceiveContinuation(new HornetQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(), + continuationPacket.isContinues()); + } + + + protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message) + { + handleReceiveProducerCredits(message.getAddress(), message.getCredits()); + } + + + protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessage message) + { + handleReceiveProducerFailCredits(message.getAddress(), message.getCredits()); + } + + class ClientSessionPacketHandler implements ChannelHandler + { + + public void handlePacket(final Packet packet) + { + byte type = packet.getType(); + + try + { + switch (type) + { + case DISCONNECT_CONSUMER: + { + handleConsumerDisconnected((DisconnectConsumerMessage) packet); + break; + } + case SESS_RECEIVE_CONTINUATION: + { + handleReceiveContinuation((SessionReceiveContinuationMessage) packet); + + break; + } + case SESS_RECEIVE_MSG: + { + handleReceivedMessagePacket((SessionReceiveMessage) packet); + + break; + } + case SESS_RECEIVE_LARGE_MSG: + { + handleReceiveLargeMessage((SessionReceiveLargeMessage) packet); + + break; + } + case PacketImpl.SESS_PRODUCER_CREDITS: + { + handleReceiveProducerCredits((SessionProducerCreditsMessage) packet); + + break; + } + case PacketImpl.SESS_PRODUCER_FAIL_CREDITS: + { + handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet); + + break; + } + case EXCEPTION: + { + // We can only log these exceptions + // maybe we should cache it on SessionContext and throw an exception on any next calls + HornetQExceptionMessage mem = (HornetQExceptionMessage) packet; + + HornetQClientLogger.LOGGER.receivedExceptionAsynchronously(mem.getException()); + + break; + } + default: + { + throw new IllegalStateException("Invalid packet: " + type); + } + } + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.failedToHandlePacket(e); + } + + sessionChannel.confirm(packet); + } + } + + private long getConsumerID(ClientConsumer consumer) + { + return ((HornetQConsumerContext)consumer.getConsumerContext()).getId(); + } + + private ClassLoader lookupTCCL() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return Thread.currentThread().getContextClassLoader(); + } + }); + + } + + private int calcWindowSize(final int windowSize) + { + int clientWindowSize; + if (windowSize == -1) + { + // No flow control - buffer can increase without bound! Only use with + // caution for very fast consumers + clientWindowSize = -1; + } + else if (windowSize == 0) + { + // Slow consumer - no buffering + clientWindowSize = 0; + } + else if (windowSize == 1) + { + // Slow consumer = buffer 1 + clientWindowSize = 1; + } + else if (windowSize > 1) + { + // Client window size is half server window size + clientWindowSize = windowSize >> 1; + } + else + { + throw HornetQClientMessageBundle.BUNDLE.invalidWindowSize(windowSize); + } + + return clientWindowSize; + } + + + private void sendPacketWithoutLock(final Channel parameterChannel, final Packet packet) + { + packet.setChannelID(parameterChannel.getID()); + + Connection conn = parameterChannel.getConnection().getTransportConnection(); + + HornetQBuffer buffer = packet.encode(this.getCoreConnection()); + + conn.write(buffer, false, false); + } + + +}
