http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java index 71fedc1..cb68c47 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageImpl.java @@ -20,11 +20,13 @@ import java.nio.ByteBuffer; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQBuffers; import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.HornetQPropertyConversionException; import org.hornetq.api.core.Message; import org.hornetq.api.core.SimpleString; import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.message.BodyEncoder; import org.hornetq.core.message.impl.MessageImpl; +import org.hornetq.reader.MessageUtil; /** * @@ -37,7 +39,8 @@ import org.hornetq.core.message.impl.MessageImpl; public class ClientMessageImpl extends MessageImpl implements ClientMessageInternal { // added this constant here so that the client package have no dependency on JMS - public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo"); + public static final SimpleString REPLYTO_HEADER_NAME = MessageUtil.REPLYTO_HEADER_NAME; + private int deliveryCount; @@ -68,42 +71,54 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter super(type, durable, expiration, timestamp, priority, initialMessageBufferSize); } + @Override public boolean isServerMessage() { return false; } + @Override public void onReceipt(final ClientConsumerInternal consumer) { this.consumer = consumer; } - public void setDeliveryCount(final int deliveryCount) + @Override + public ClientMessageImpl setDeliveryCount(final int deliveryCount) { this.deliveryCount = deliveryCount; + return this; } + @Override public int getDeliveryCount() { return deliveryCount; } - public void acknowledge() throws HornetQException + @Override + public ClientMessageImpl acknowledge() throws HornetQException { if (consumer != null) { consumer.acknowledge(this); } + + return this; } - public void individualAcknowledge() throws HornetQException + @Override + public ClientMessageImpl individualAcknowledge() throws HornetQException { if (consumer != null) { consumer.individualAcknowledge(this); } + + return this; } + @Override public int getFlowControlSize() { if (flowControlSize < 0) @@ -113,6 +128,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter return flowControlSize; } + @Override public void setFlowControlSize(final int flowControlSize) { this.flowControlSize = flowControlSize; @@ -121,16 +137,19 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter /** * @return the largeMessage */ + @Override public boolean isLargeMessage() { return false; } + @Override public boolean isCompressed() { return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED); } + @Override public int getBodySize() { return buffer.writerIndex() - buffer.readerIndex(); @@ -159,9 +178,10 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter } @Override - public void setOutputStream(final OutputStream out) throws HornetQException + public ClientMessageImpl setOutputStream(final OutputStream out) throws HornetQException { saveToOutputStream(out); + return this; } @Override @@ -178,6 +198,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter /** * @return the bodyInputStream */ + @Override public InputStream getBodyInputStream() { return bodyInputStream; @@ -186,9 +207,11 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter /** * @param bodyInputStream the bodyInputStream to set */ - public void setBodyInputStream(final InputStream bodyInputStream) + @Override + public ClientMessageImpl setBodyInputStream(final InputStream bodyInputStream) { this.bodyInputStream = bodyInputStream; + return this; } @Override @@ -197,21 +220,168 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter return new DecodingContext(); } + @Override + public ClientMessageImpl putBooleanProperty(final SimpleString key, final boolean value) + { + return (ClientMessageImpl) super.putBooleanProperty(key, value); + } + + @Override + public ClientMessageImpl putByteProperty(final SimpleString key, final byte value) + { + return (ClientMessageImpl) super.putByteProperty(key, value); + } + + @Override + public ClientMessageImpl putBytesProperty(final SimpleString key, final byte[] value) + { + return (ClientMessageImpl) super.putBytesProperty(key, value); + } + + @Override + public ClientMessageImpl putCharProperty(SimpleString key, char value) + { + return (ClientMessageImpl) super.putCharProperty(key, value); + } + + @Override + public ClientMessageImpl putCharProperty(String key, char value) + { + return (ClientMessageImpl) super.putCharProperty(key, value); + } + + @Override + public ClientMessageImpl putShortProperty(final SimpleString key, final short value) + { + return (ClientMessageImpl) super.putShortProperty(key, value); + } + + @Override + public ClientMessageImpl putIntProperty(final SimpleString key, final int value) + { + return (ClientMessageImpl) super.putIntProperty(key, value); + } + + @Override + public ClientMessageImpl putLongProperty(final SimpleString key, final long value) + { + return (ClientMessageImpl) super.putLongProperty(key, value); + } + + @Override + public ClientMessageImpl putFloatProperty(final SimpleString key, final float value) + { + return (ClientMessageImpl) super.putFloatProperty(key, value); + } + + @Override + public ClientMessageImpl putDoubleProperty(final SimpleString key, final double value) + { + return (ClientMessageImpl) super.putDoubleProperty(key, value); + } + + @Override + public ClientMessageImpl putStringProperty(final SimpleString key, final SimpleString value) + { + return (ClientMessageImpl) super.putStringProperty(key, value); + } + + @Override + public ClientMessageImpl putObjectProperty(final SimpleString key, final Object value) throws HornetQPropertyConversionException + { + return (ClientMessageImpl) super.putObjectProperty(key, value); + } + + @Override + public ClientMessageImpl putObjectProperty(final String key, final Object value) throws HornetQPropertyConversionException + { + return (ClientMessageImpl) super.putObjectProperty(key, value); + } + + @Override + public ClientMessageImpl putBooleanProperty(final String key, final boolean value) + { + return (ClientMessageImpl) super.putBooleanProperty(key, value); + } + + @Override + public ClientMessageImpl putByteProperty(final String key, final byte value) + { + return (ClientMessageImpl) super.putByteProperty(key, value); + } + + @Override + public ClientMessageImpl putBytesProperty(final String key, final byte[] value) + { + return (ClientMessageImpl) super.putBytesProperty(key, value); + } + + @Override + public ClientMessageImpl putShortProperty(final String key, final short value) + { + return (ClientMessageImpl) super.putShortProperty(key, value); + } + + @Override + public ClientMessageImpl putIntProperty(final String key, final int value) + { + return (ClientMessageImpl) super.putIntProperty(key, value); + } + + @Override + public ClientMessageImpl putLongProperty(final String key, final long value) + { + return (ClientMessageImpl) super.putLongProperty(key, value); + } + + @Override + public ClientMessageImpl putFloatProperty(final String key, final float value) + { + return (ClientMessageImpl) super.putFloatProperty(key, value); + } + + @Override + public ClientMessageImpl putDoubleProperty(final String key, final double value) + { + return (ClientMessageImpl) super.putDoubleProperty(key, value); + } + + @Override + public ClientMessageImpl putStringProperty(final String key, final String value) + { + return (ClientMessageImpl) super.putStringProperty(key, value); + } + + @Override + public ClientMessageImpl writeBodyBufferBytes(byte[] bytes) + { + return (ClientMessageImpl) super.writeBodyBufferBytes(bytes); + } + + @Override + public ClientMessageImpl writeBodyBufferString(String string) + { + return (ClientMessageImpl) super.writeBodyBufferString(string); + } + private final class DecodingContext implements BodyEncoder { public DecodingContext() { } + @Override public void open() { getBodyBuffer().readerIndex(0); } + @Override public void close() { } + @Override public long getLargeBodySize() { if (isLargeMessage()) @@ -224,12 +394,14 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter } } + @Override public int encode(final ByteBuffer bufferRead) throws HornetQException { HornetQBuffer buffer1 = HornetQBuffers.wrappedBuffer(bufferRead); return encode(buffer1, bufferRead.capacity()); } + @Override public int encode(final HornetQBuffer bufferOut, final int size) { byte[] bytes = new byte[size];
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java index 132be0f..2a1e291 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientMessageInternal.java @@ -13,8 +13,8 @@ package org.hornetq.core.client.impl; +import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.ClientMessage; -import org.hornetq.core.message.impl.MessageInternal; import org.hornetq.utils.TypedProperties; /** @@ -22,7 +22,7 @@ import org.hornetq.utils.TypedProperties; * * @author <a href="mailto:[email protected]">Tim Fox</a> */ -public interface ClientMessageInternal extends ClientMessage, MessageInternal +public interface ClientMessageInternal extends ClientMessage { TypedProperties getProperties(); @@ -33,6 +33,8 @@ public interface ClientMessageInternal extends ClientMessage, MessageInternal /** Size used for FlowControl */ void setFlowControlSize(int flowControlSize); + void setAddressTransient(SimpleString address); + void onReceipt(ClientConsumerInternal consumer); /** http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java index 5b12ade..156d526 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManager.java @@ -13,6 +13,7 @@ package org.hornetq.core.client.impl; import org.hornetq.api.core.SimpleString; +import org.hornetq.spi.core.remoting.SessionContext; /** * A ClientProducerCreditManager @@ -23,7 +24,7 @@ import org.hornetq.api.core.SimpleString; */ public interface ClientProducerCreditManager { - ClientProducerCredits getCredits(SimpleString address, boolean anon); + ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context); void returnCredits(SimpleString address); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java index f9fab0d..7477543 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java @@ -17,6 +17,7 @@ import java.util.LinkedHashMap; import java.util.Map; import org.hornetq.api.core.SimpleString; +import org.hornetq.spi.core.remoting.SessionContext; /** * A ProducerCreditManager @@ -42,7 +43,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana this.windowSize = windowSize; } - public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon) + public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon, SessionContext context) { if (windowSize == -1) { @@ -84,7 +85,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana // while this is still sending requests causing a dead lock if (needInit) { - credits.init(); + credits.init(context); } return credits; @@ -202,7 +203,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana return false; } - public void init() + public void init(SessionContext ctx) { } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java index a1eeafc..f4398f1 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCredits.java @@ -13,6 +13,7 @@ package org.hornetq.core.client.impl; import org.hornetq.api.core.HornetQException; +import org.hornetq.spi.core.remoting.SessionContext; /** * A ClientProducerCredits @@ -31,7 +32,7 @@ public interface ClientProducerCredits boolean isBlocked(); - void init(); + void init(SessionContext sessionContext); void reset(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java index 4deb64d..3290f27 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java @@ -16,6 +16,7 @@ import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.SimpleString; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.client.HornetQClientMessageBundle; +import org.hornetq.spi.core.remoting.SessionContext; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -47,6 +48,8 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits private boolean serverRespondedWithFail; + private SessionContext sessionContext; + public ClientProducerCreditsImpl(final ClientSessionInternal session, final SimpleString address, final int windowSize) @@ -62,11 +65,15 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits semaphore = new Semaphore(0, false); } - public void init() + public void init(SessionContext sessionContext) { // We initial request twice as many credits as we request in subsequent requests // This allows the producer to keep sending as more arrive, minimising pauses checkCredits(windowSize); + + this.sessionContext = sessionContext; + + this.sessionContext.linkFlowControl(address, this); } public void acquireCredits(final int credits) throws InterruptedException, HornetQException http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java index 71e6b8f..ac12955 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientProducerImpl.java @@ -326,7 +326,7 @@ public class ClientProducerImpl implements ClientProducerInternal throw new HornetQInterruptedException(e); } - sessionContext.sendFullMessage(msgI, sendBlocking, handler); + sessionContext.sendFullMessage(msgI, sendBlocking, handler, address); } private void checkClosed() throws HornetQException http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java index c9736d5..8660a2d 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -45,8 +44,6 @@ import org.hornetq.api.core.client.SessionFailureListener; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.protocol.core.CoreRemotingConnection; -import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager; -import org.hornetq.core.protocol.core.impl.PacketDecoder; import org.hornetq.core.remoting.FailureListener; import org.hornetq.core.server.HornetQComponent; import org.hornetq.spi.core.protocol.RemotingConnection; @@ -56,7 +53,7 @@ import org.hornetq.spi.core.remoting.Connection; import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener; import org.hornetq.spi.core.remoting.Connector; import org.hornetq.spi.core.remoting.ConnectorFactory; -import org.hornetq.spi.core.remoting.ProtocolResponseHandler; +import org.hornetq.spi.core.remoting.TopologyResponseHandler; import org.hornetq.spi.core.remoting.SessionContext; import org.hornetq.utils.ClassloadingUtil; import org.hornetq.utils.ConcurrentHashSet; @@ -69,15 +66,11 @@ import org.hornetq.utils.UUIDGenerator; /** * @author Tim Fox * @author Clebert Suconic + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ConnectionLifeCycleListener { - - - // TODO use the factory here - protected ClientProtocolManager clientProtocolManager = new HornetQClientProtocolManager(this); - // Constants // ------------------------------------------------------------------------------------ @@ -90,6 +83,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private final ServerLocatorInternal serverLocator; + private final ClientProtocolManager clientProtocolManager; + private TransportConfiguration connectorConfig; private TransportConfiguration backupConfig; @@ -159,7 +154,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private String liveNodeID; - public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, final TransportConfiguration connectorConfig, final long callTimeout, @@ -173,18 +167,21 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, final List<Interceptor> incomingInterceptors, - final List<Interceptor> outgoingInterceptors, - PacketDecoder packetDecoder) + final List<Interceptor> outgoingInterceptors) { createTrace = new Exception(); this.serverLocator = serverLocator; + this.clientProtocolManager = serverLocator.newProtocolManager(); + + this.clientProtocolManager.setSessionFactory(this); + this.connectorConfig = connectorConfig; connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName()); - checkTransportKeys(connectorFactory, connectorConfig.getParams()); + checkTransportKeys(connectorFactory, connectorConfig); this.callTimeout = callTimeout; @@ -227,10 +224,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); - - // TODO : Get rid of this / encapsulate it through the ClientProtocolManager (create a ExchangeServerProtocol for instance) - ((HornetQClientProtocolManager) clientProtocolManager).replacePacketDecoder(packetDecoder); - } public void disableFinalizeCheck() @@ -281,7 +274,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C this, closeExecutor, threadPool, - scheduledThreadPool); + scheduledThreadPool, + clientProtocolManager); } if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) @@ -472,9 +466,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C return listeners.remove(listener); } - public void addFailoverListener(FailoverEventListener listener) + public ClientSessionFactoryImpl addFailoverListener(FailoverEventListener listener) { failoverListeners.add(listener); + return this; } public boolean removeFailoverListener(FailoverEventListener listener) @@ -645,7 +640,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C { - if (clientProtocolManager.cleanupBeforeFailover()) + if (clientProtocolManager.cleanupBeforeFailover(me)) { @@ -677,7 +672,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C connector = null; - reconnectSessions(oldConnection, reconnectAttempts); + reconnectSessions(oldConnection, reconnectAttempts, me); if (oldConnection != null) { @@ -785,8 +780,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor()); - context.setSession(session); - synchronized (sessions) { if (closed || !clientProtocolManager.isAlive()) @@ -859,7 +852,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C /* * Re-attach sessions all pre-existing sessions to the new remoting connection */ - private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts) + private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts, final HornetQException cause) { HashSet<ClientSessionInternal> sessionsToFailover; synchronized (sessions) @@ -906,7 +899,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C for (ClientSessionInternal session : sessionsToFailover) { - session.handleFailover(connection); + session.handleFailover(connection, cause); } } @@ -1111,25 +1104,26 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C protected void schedulePing() { - if (clientFailureCheckPeriod != -1) + if (pingerFuture == null) { - if (pingerFuture == null) - { - pingRunnable = new ClientSessionFactoryImpl.PingRunnable(); + pingRunnable = new ClientSessionFactoryImpl.PingRunnable(); + if (clientFailureCheckPeriod != -1) + { pingerFuture = scheduledThreadPool.scheduleWithFixedDelay(new ClientSessionFactoryImpl.ActualScheduledPinger(pingRunnable), 0, clientFailureCheckPeriod, TimeUnit.MILLISECONDS); - // To make sure the first ping will be sent - pingRunnable.send(); - } - // send a ping every time we create a new remoting connection - // to set up its TTL on the server side - else - { - pingRunnable.run(); } + + // To make sure the first ping will be sent + pingRunnable.send(); + } + // send a ping every time we create a new remoting connection + // to set up its TTL on the server side + else + { + pingRunnable.run(); } } @@ -1262,14 +1256,15 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C this, closeExecutor, threadPool, - scheduledThreadPool); + scheduledThreadPool, + clientProtocolManager); } - private void checkTransportKeys(final ConnectorFactory factory, final Map<String, Object> params) + private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc) { - if (params != null) + if (tc.getParams() != null) { - Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), params.keySet()); + Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), tc.getParams().keySet()); if (!invalid.isEmpty()) { @@ -1282,7 +1277,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } } - /** * It will connect to either live or backup accordingly to the current configurations * it will also switch to backup case it can't connect to live and there's a backup configured @@ -1535,7 +1529,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C RemotingConnection newConnection = clientProtocolManager.connect(transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, - new SessionFactoryProtocolHandler()); + new SessionFactoryTopologyHandler()); newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID())); @@ -1566,8 +1560,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } } + @Override + public String getLiveNodeId() + { + return liveNodeID; + } - class SessionFactoryProtocolHandler implements ProtocolResponseHandler + class SessionFactoryTopologyHandler implements TopologyResponseHandler { @Override @@ -1607,5 +1606,4 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C serverLocator.notifyNodeDown(eventTime, nodeID); } } - } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java index 43afd66..e09823d 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java @@ -24,6 +24,7 @@ import org.hornetq.utils.ConfirmationWindowWarning; * A ClientSessionFactoryInternal * * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> * */ public interface ClientSessionFactoryInternal extends ClientSessionFactory @@ -36,6 +37,8 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory void disableFinalizeCheck(); + String getLiveNodeId(); + // for testing int numConnections(); @@ -56,6 +59,5 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory ConfirmationWindowWarning getConfirmationWindowWarning(); - Lock lockFailover(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java index 3975223..197d80c 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java @@ -40,6 +40,7 @@ import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.remoting.FailureListener; import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.spi.core.remoting.ConsumerContext; import org.hornetq.spi.core.remoting.SessionContext; import org.hornetq.utils.ConfirmationWindowWarning; import org.hornetq.utils.TokenBucketLimiterImpl; @@ -51,6 +52,7 @@ import org.hornetq.utils.XidCodecSupport; * @author <a href="mailto:[email protected]">Clebert Suconic</a> * @author <a href="mailto:[email protected]">Jeff Mesnil</a> * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public final class ClientSessionImpl implements ClientSessionInternal, FailureListener { @@ -77,7 +79,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private final Set<ClientProducerInternal> producers = new HashSet<ClientProducerInternal>(); // Consumers must be an ordered map so if we fail we recreate them in the same order with the same ids - private final Map<Long, ClientConsumerInternal> consumers = new LinkedHashMap<Long, ClientConsumerInternal>(); + private final Map<ConsumerContext, ClientConsumerInternal> consumers = new LinkedHashMap<ConsumerContext, ClientConsumerInternal>(); private volatile boolean closed; @@ -223,6 +225,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi this.sessionContext = sessionContext; + sessionContext.setSession(this); + confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning(); } @@ -652,7 +656,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } - public void start() throws HornetQException + public ClientSessionImpl start() throws HornetQException { checkClosed(); @@ -667,6 +671,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi started = true; } + + return this; } public void stop() throws HornetQException @@ -721,6 +727,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return inClose; } + @Override + public String getNodeId() + { + return sessionFactory.getLiveNodeId(); + } + // ClientSessionInternal implementation // ------------------------------------------------------------ @@ -812,7 +824,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi { synchronized (consumers) { - consumers.put(consumer.getID(), consumer); + consumers.put(consumer.getConsumerContext(), consumer); } } @@ -828,7 +840,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi { synchronized (consumers) { - consumers.remove(consumer.getID()); + consumers.remove(consumer.getConsumerContext()); } } @@ -840,7 +852,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } - public void handleReceiveMessage(final long consumerID, final ClientMessageInternal message) throws Exception + public void handleReceiveMessage(final ConsumerContext consumerID, final ClientMessageInternal message) throws Exception { ClientConsumerInternal consumer = getConsumer(consumerID); @@ -850,7 +862,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } - public void handleReceiveLargeMessage(final long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception + public void handleReceiveLargeMessage(final ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception { ClientConsumerInternal consumer = getConsumer(consumerID); @@ -860,7 +872,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } - public void handleReceiveContinuation(final long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception + public void handleReceiveContinuation(final ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception { ClientConsumerInternal consumer = getConsumer(consumerID); @@ -871,9 +883,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } @Override - public void handleConsumerDisconnect(long consumerID) throws HornetQException + public void handleConsumerDisconnect(ConsumerContext context) throws HornetQException { - final ClientConsumerInternal consumer = getConsumer(consumerID); + final ClientConsumerInternal consumer = getConsumer(context); if (consumer != null) { @@ -944,9 +956,10 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi doCleanup(failingOver); } - public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) + public ClientSessionImpl setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { sessionContext.setSendAcknowledgementHandler(handler); + return this; } public void preHandleFailover(RemotingConnection connection) @@ -959,7 +972,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // Needs to be synchronized to prevent issues with occurring concurrently with close() - public void handleFailover(final RemotingConnection backupConnection) + public void handleFailover(final RemotingConnection backupConnection, HornetQException cause) { synchronized (this) { @@ -1008,7 +1021,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, defaultAddress); - for (Map.Entry<Long, ClientConsumerInternal> entryx : consumers.entrySet()) + for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet()) { ClientConsumerInternal consumerInternal = entryx.getValue(); @@ -1043,7 +1056,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi resetCreditManager = true; } - sessionContext.returnBlocking(); + sessionContext.returnBlocking(cause); } } catch (Throwable t) @@ -1137,7 +1150,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon) { - return producerCreditManager.getCredits(address, anon); + ClientProducerCredits credits = producerCreditManager.getCredits(address, anon, sessionContext); + + return credits; } public void returnCredits(final SimpleString address) @@ -1343,21 +1358,47 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi { checkXA(); - if (!(xares instanceof ClientSessionInternal)) + if (forceNotSameRM) { return false; } - if (forceNotSameRM) + ClientSessionInternal other = getSessionInternalFromXAResource(xares); + + if (other == null) { return false; } - ClientSessionInternal other = (ClientSessionInternal) xares; + String liveNodeId = sessionFactory.getLiveNodeId(); + String otherLiveNodeId = ((ClientSessionFactoryInternal) other.getSessionFactory()).getLiveNodeId(); + if (liveNodeId != null && otherLiveNodeId != null) + { + return liveNodeId.equals(otherLiveNodeId); + } + + //we shouldn't get here, live node id should always be set return sessionFactory == other.getSessionFactory(); } + private ClientSessionInternal getSessionInternalFromXAResource(final XAResource xares) + { + if (xares == null) + { + return null; + } + if (xares instanceof ClientSessionInternal) + { + return (ClientSessionInternal) xares; + } + else if (xares instanceof HornetQXAResource) + { + return getSessionInternalFromXAResource(((HornetQXAResource)xares).getResource()); + } + return null; + } + public int prepare(final Xid xid) throws XAException { checkXA(); @@ -1732,15 +1773,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } - /** - * @param consumerID - * @return - */ - private ClientConsumerInternal getConsumer(final long consumerID) + private ClientConsumerInternal getConsumer(final ConsumerContext consumerContext) { synchronized (consumers) { - ClientConsumerInternal consumer = consumers.get(consumerID); + ClientConsumerInternal consumer = consumers.get(consumerContext); return consumer; } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java index 67ea4fa..a05e6a3 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ClientSessionInternal.java @@ -19,6 +19,7 @@ import org.hornetq.api.core.client.ClientConsumer; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.SendAcknowledgementHandler; import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.spi.core.remoting.ConsumerContext; /** * A ClientSessionInternal @@ -49,17 +50,17 @@ public interface ClientSessionInternal extends ClientSession void removeProducer(ClientProducerInternal producer); - void handleReceiveMessage(long consumerID, ClientMessageInternal message) throws Exception; + void handleReceiveMessage(ConsumerContext consumerID, ClientMessageInternal message) throws Exception; - void handleReceiveLargeMessage(long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception; + void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception; - void handleReceiveContinuation(long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception; + void handleReceiveContinuation(ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception; - void handleConsumerDisconnect(long consumerID) throws HornetQException; + void handleConsumerDisconnect(ConsumerContext consumerContext) throws HornetQException; void preHandleFailover(RemotingConnection connection); - void handleFailover(RemotingConnection backupConnection); + void handleFailover(RemotingConnection backupConnection, HornetQException cause); RemotingConnection getConnection(); @@ -116,4 +117,5 @@ public interface ClientSessionInternal extends ClientSession boolean isClosing(); + String getNodeId(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java index f7e0c61..5b90f55 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/DelegatingSession.java @@ -29,6 +29,7 @@ import org.hornetq.api.core.client.SendAcknowledgementHandler; import org.hornetq.api.core.client.SessionFailureListener; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.spi.core.remoting.ConsumerContext; import org.hornetq.utils.ConcurrentHashSet; /** @@ -38,6 +39,7 @@ import org.hornetq.utils.ConcurrentHashSet; * on GC if it has not already been closed * * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class DelegatingSession implements ClientSessionInternal { @@ -388,33 +390,33 @@ public class DelegatingSession implements ClientSessionInternal session.preHandleFailover(connection); } - public void handleFailover(final RemotingConnection backupConnection) + public void handleFailover(final RemotingConnection backupConnection, HornetQException cause) { - session.handleFailover(backupConnection); + session.handleFailover(backupConnection, cause); } @Override - public void handleReceiveMessage(long consumerID, ClientMessageInternal message) throws Exception + public void handleReceiveMessage(ConsumerContext consumerID, ClientMessageInternal message) throws Exception { session.handleReceiveMessage(consumerID, message); } @Override - public void handleReceiveLargeMessage(long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception + public void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception { session.handleReceiveLargeMessage(consumerID, clientLargeMessage, largeMessageSize); } @Override - public void handleReceiveContinuation(long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception + public void handleReceiveContinuation(ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception { session.handleReceiveContinuation(consumerID, chunk, flowControlSize, isContinues); } @Override - public void handleConsumerDisconnect(long consumerID) throws HornetQException + public void handleConsumerDisconnect(ConsumerContext consumerContext) throws HornetQException { - session.handleConsumerDisconnect(consumerID); + session.handleConsumerDisconnect(consumerContext); } public boolean isAutoCommitAcks() @@ -507,9 +509,10 @@ public class DelegatingSession implements ClientSessionInternal session.rollback(xid); } - public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) + public DelegatingSession setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { session.setSendAcknowledgementHandler(handler); + return this; } public boolean setTransactionTimeout(final int seconds) throws XAException @@ -522,9 +525,10 @@ public class DelegatingSession implements ClientSessionInternal session.resetIfNeeded(); } - public void start() throws HornetQException + public DelegatingSession start() throws HornetQException { session.start(); + return this; } public void start(final Xid xid, final int flags) throws XAException @@ -642,4 +646,10 @@ public class DelegatingSession implements ClientSessionInternal { session.scheduleConfirmation(handler, msg); } + + @Override + public String getNodeId() + { + return session.getNodeId(); + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/HornetQXAResource.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/HornetQXAResource.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/HornetQXAResource.java new file mode 100644 index 0000000..e278b8c --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/HornetQXAResource.java @@ -0,0 +1,20 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.client.impl; + +import javax.transaction.xa.XAResource; + +public interface HornetQXAResource extends XAResource +{ + XAResource getResource(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java index d5612a2..5aa37b8 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java @@ -53,9 +53,10 @@ import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.cluster.DiscoveryEntry; import org.hornetq.core.cluster.DiscoveryGroup; import org.hornetq.core.cluster.DiscoveryListener; -import org.hornetq.core.protocol.ClientPacketDecoder; -import org.hornetq.core.protocol.core.impl.PacketDecoder; +import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManagerFactory; import org.hornetq.core.remoting.FailureListener; +import org.hornetq.spi.core.remoting.ClientProtocolManager; +import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory; import org.hornetq.spi.core.remoting.Connector; import org.hornetq.utils.ClassloadingUtil; import org.hornetq.utils.HornetQThreadFactory; @@ -81,6 +82,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private static final long serialVersionUID = -1615857864410205260L; + + // This is the default value + private ClientProtocolManagerFactory protocolManagerFactory = HornetQClientProtocolManagerFactory.getInstance(); + private final boolean ha; private boolean finalizeCheck = true; @@ -208,12 +213,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery * If you need to, make them transient and handle the serialization yourself * */ - - /* - * we use the client decoder by default but there are times when we want to use the server packet decoder - */ - private transient PacketDecoder packetDecoder = ClientPacketDecoder.INSTANCE; - private final Exception traceException = new Exception(); // To be called when there are ServerLocator being finalized. @@ -642,6 +641,28 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + + public ClientProtocolManager newProtocolManager() + { + return getProtocolManagerFactory().newProtocolManager(); + } + + public ClientProtocolManagerFactory getProtocolManagerFactory() + { + if (protocolManagerFactory == null) + { + // this could happen over serialization from older versions + protocolManagerFactory = HornetQClientProtocolManagerFactory.getInstance(); + } + return protocolManagerFactory; + } + + public void setProtocolManagerFactory(ClientProtocolManagerFactory protocolManagerFactory) + { + this.protocolManagerFactory = protocolManagerFactory; + } + + public void disableFinalizeCheck() { finalizeCheck = false; @@ -675,9 +696,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return connect(true); } - public void setAfterConnectionInternalListener(AfterConnectInternalListener listener) + public ServerLocatorImpl setAfterConnectionInternalListener(AfterConnectInternalListener listener) { this.afterConnectListener = listener; + return this; } public AfterConnectInternalListener getAfterConnectInternalListener() @@ -736,8 +758,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery threadPool, scheduledThreadPool, incomingInterceptors, - outgoingInterceptors, - packetDecoder); + outgoingInterceptors); addToConnecting(factory); try @@ -780,8 +801,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery threadPool, scheduledThreadPool, incomingInterceptors, - outgoingInterceptors, - packetDecoder); + outgoingInterceptors); addToConnecting(factory); try @@ -873,8 +893,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery threadPool, scheduledThreadPool, incomingInterceptors, - outgoingInterceptors, - packetDecoder); + outgoingInterceptors); try { addToConnecting(factory); @@ -963,9 +982,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return cacheLargeMessagesClient; } - public void setCacheLargeMessagesClient(final boolean cached) + public ServerLocatorImpl setCacheLargeMessagesClient(final boolean cached) { cacheLargeMessagesClient = cached; + return this; } public long getClientFailureCheckPeriod() @@ -973,10 +993,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return clientFailureCheckPeriod; } - public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod) + public ServerLocatorImpl setClientFailureCheckPeriod(final long clientFailureCheckPeriod) { checkWrite(); this.clientFailureCheckPeriod = clientFailureCheckPeriod; + return this; } public long getConnectionTTL() @@ -984,10 +1005,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return connectionTTL; } - public void setConnectionTTL(final long connectionTTL) + public ServerLocatorImpl setConnectionTTL(final long connectionTTL) { checkWrite(); this.connectionTTL = connectionTTL; + return this; } public long getCallTimeout() @@ -995,10 +1017,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return callTimeout; } - public void setCallTimeout(final long callTimeout) + public ServerLocatorImpl setCallTimeout(final long callTimeout) { checkWrite(); this.callTimeout = callTimeout; + return this; } public long getCallFailoverTimeout() @@ -1006,10 +1029,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return callFailoverTimeout; } - public void setCallFailoverTimeout(long callFailoverTimeout) + public ServerLocatorImpl setCallFailoverTimeout(long callFailoverTimeout) { checkWrite(); this.callFailoverTimeout = callFailoverTimeout; + return this; } public int getMinLargeMessageSize() @@ -1017,10 +1041,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return minLargeMessageSize; } - public void setMinLargeMessageSize(final int minLargeMessageSize) + public ServerLocatorImpl setMinLargeMessageSize(final int minLargeMessageSize) { checkWrite(); this.minLargeMessageSize = minLargeMessageSize; + return this; } public int getConsumerWindowSize() @@ -1028,10 +1053,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return consumerWindowSize; } - public void setConsumerWindowSize(final int consumerWindowSize) + public ServerLocatorImpl setConsumerWindowSize(final int consumerWindowSize) { checkWrite(); this.consumerWindowSize = consumerWindowSize; + return this; } public int getConsumerMaxRate() @@ -1039,10 +1065,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return consumerMaxRate; } - public void setConsumerMaxRate(final int consumerMaxRate) + public ServerLocatorImpl setConsumerMaxRate(final int consumerMaxRate) { checkWrite(); this.consumerMaxRate = consumerMaxRate; + return this; } public int getConfirmationWindowSize() @@ -1050,10 +1077,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return confirmationWindowSize; } - public void setConfirmationWindowSize(final int confirmationWindowSize) + public ServerLocatorImpl setConfirmationWindowSize(final int confirmationWindowSize) { checkWrite(); this.confirmationWindowSize = confirmationWindowSize; + return this; } public int getProducerWindowSize() @@ -1061,10 +1089,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return producerWindowSize; } - public void setProducerWindowSize(final int producerWindowSize) + public ServerLocatorImpl setProducerWindowSize(final int producerWindowSize) { checkWrite(); this.producerWindowSize = producerWindowSize; + return this; } public int getProducerMaxRate() @@ -1072,10 +1101,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return producerMaxRate; } - public void setProducerMaxRate(final int producerMaxRate) + public ServerLocatorImpl setProducerMaxRate(final int producerMaxRate) { checkWrite(); this.producerMaxRate = producerMaxRate; + return this; } public boolean isBlockOnAcknowledge() @@ -1083,10 +1113,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return blockOnAcknowledge; } - public void setBlockOnAcknowledge(final boolean blockOnAcknowledge) + public ServerLocatorImpl setBlockOnAcknowledge(final boolean blockOnAcknowledge) { checkWrite(); this.blockOnAcknowledge = blockOnAcknowledge; + return this; } public boolean isBlockOnDurableSend() @@ -1094,10 +1125,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return blockOnDurableSend; } - public void setBlockOnDurableSend(final boolean blockOnDurableSend) + public ServerLocatorImpl setBlockOnDurableSend(final boolean blockOnDurableSend) { checkWrite(); this.blockOnDurableSend = blockOnDurableSend; + return this; } public boolean isBlockOnNonDurableSend() @@ -1105,10 +1137,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return blockOnNonDurableSend; } - public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) + public ServerLocatorImpl setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) { checkWrite(); this.blockOnNonDurableSend = blockOnNonDurableSend; + return this; } public boolean isAutoGroup() @@ -1116,10 +1149,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return autoGroup; } - public void setAutoGroup(final boolean autoGroup) + public ServerLocatorImpl setAutoGroup(final boolean autoGroup) { checkWrite(); this.autoGroup = autoGroup; + return this; } public boolean isPreAcknowledge() @@ -1127,10 +1161,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return preAcknowledge; } - public void setPreAcknowledge(final boolean preAcknowledge) + public ServerLocatorImpl setPreAcknowledge(final boolean preAcknowledge) { checkWrite(); this.preAcknowledge = preAcknowledge; + return this; } public int getAckBatchSize() @@ -1138,10 +1173,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return ackBatchSize; } - public void setAckBatchSize(final int ackBatchSize) + public ServerLocatorImpl setAckBatchSize(final int ackBatchSize) { checkWrite(); this.ackBatchSize = ackBatchSize; + return this; } public boolean isUseGlobalPools() @@ -1149,10 +1185,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return useGlobalPools; } - public void setUseGlobalPools(final boolean useGlobalPools) + public ServerLocatorImpl setUseGlobalPools(final boolean useGlobalPools) { checkWrite(); this.useGlobalPools = useGlobalPools; + return this; } public int getScheduledThreadPoolMaxSize() @@ -1160,10 +1197,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return scheduledThreadPoolMaxSize; } - public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) + public ServerLocatorImpl setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) { checkWrite(); this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize; + return this; } public int getThreadPoolMaxSize() @@ -1171,10 +1209,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return threadPoolMaxSize; } - public void setThreadPoolMaxSize(final int threadPoolMaxSize) + public ServerLocatorImpl setThreadPoolMaxSize(final int threadPoolMaxSize) { checkWrite(); this.threadPoolMaxSize = threadPoolMaxSize; + return this; } public long getRetryInterval() @@ -1182,10 +1221,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return retryInterval; } - public void setRetryInterval(final long retryInterval) + public ServerLocatorImpl setRetryInterval(final long retryInterval) { checkWrite(); this.retryInterval = retryInterval; + return this; } public long getMaxRetryInterval() @@ -1193,10 +1233,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return maxRetryInterval; } - public void setMaxRetryInterval(final long retryInterval) + public ServerLocatorImpl setMaxRetryInterval(final long retryInterval) { checkWrite(); maxRetryInterval = retryInterval; + return this; } public double getRetryIntervalMultiplier() @@ -1204,10 +1245,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return retryIntervalMultiplier; } - public void setRetryIntervalMultiplier(final double retryIntervalMultiplier) + public ServerLocatorImpl setRetryIntervalMultiplier(final double retryIntervalMultiplier) { checkWrite(); this.retryIntervalMultiplier = retryIntervalMultiplier; + return this; } public int getReconnectAttempts() @@ -1215,16 +1257,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return reconnectAttempts; } - public void setReconnectAttempts(final int reconnectAttempts) + public ServerLocatorImpl setReconnectAttempts(final int reconnectAttempts) { checkWrite(); this.reconnectAttempts = reconnectAttempts; + return this; } - public void setInitialConnectAttempts(int initialConnectAttempts) + public ServerLocatorImpl setInitialConnectAttempts(int initialConnectAttempts) { checkWrite(); this.initialConnectAttempts = initialConnectAttempts; + return this; } public int getInitialConnectAttempts() @@ -1237,10 +1281,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return this.failoverOnInitialConnection; } - public void setFailoverOnInitialConnection(final boolean failover) + public ServerLocatorImpl setFailoverOnInitialConnection(final boolean failover) { checkWrite(); this.failoverOnInitialConnection = failover; + return this; } public String getConnectionLoadBalancingPolicyClassName() @@ -1248,10 +1293,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return connectionLoadBalancingPolicyClassName; } - public void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) + public ServerLocatorImpl setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName) { checkWrite(); connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName; + return this; } public TransportConfiguration[] getStaticTransportConfigurations() @@ -1272,14 +1318,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery addIncomingInterceptor(interceptor); } - public void addIncomingInterceptor(final Interceptor interceptor) + public ServerLocatorImpl addIncomingInterceptor(final Interceptor interceptor) { incomingInterceptors.add(interceptor); + return this; } - public void addOutgoingInterceptor(final Interceptor interceptor) + public ServerLocatorImpl addOutgoingInterceptor(final Interceptor interceptor) { outgoingInterceptors.add(interceptor); + return this; } @Override @@ -1304,16 +1352,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return initialMessagePacketSize; } - public void setInitialMessagePacketSize(final int size) + public ServerLocatorImpl setInitialMessagePacketSize(final int size) { checkWrite(); initialMessagePacketSize = size; + return this; } - public void setGroupID(final String groupID) + public ServerLocatorImpl setGroupID(final String groupID) { checkWrite(); this.groupID = groupID; + return this; } public String getGroupID() @@ -1326,9 +1376,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return compressLargeMessage; } - public void setCompressLargeMessage(boolean avoid) + public ServerLocatorImpl setCompressLargeMessage(boolean avoid) { this.compressLargeMessage = avoid; + return this; } private void checkWrite() @@ -1348,14 +1399,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return initialConnectors.length; } - public void setIdentity(String identity) + public ServerLocatorImpl setIdentity(String identity) { this.identity = identity; + return this; } - public void setNodeID(String nodeID) + public ServerLocatorImpl setNodeID(String nodeID) { this.nodeID = nodeID; + return this; } public String getNodeID() @@ -1363,9 +1416,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return nodeID; } - public void setClusterConnection(boolean clusterConnection) + public ServerLocatorImpl setClusterConnection(boolean clusterConnection) { this.clusterConnection = clusterConnection; + return this; } public boolean isClusterConnection() @@ -1378,9 +1432,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return clusterTransportConfiguration; } - public void setClusterTransportConfiguration(TransportConfiguration tc) + public ServerLocatorImpl setClusterTransportConfiguration(TransportConfiguration tc) { this.clusterTransportConfiguration = tc; + return this; } @Override @@ -1742,20 +1797,15 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } @Override - public void setPacketDecoder(PacketDecoder packetDecoder) - { - this.packetDecoder = packetDecoder; - } - - @Override public boolean isConnectable() { return getNumInitialConnectors() > 0 || getDiscoveryGroupConfiguration() != null; } - public void addClusterTopologyListener(final ClusterTopologyListener listener) + public ServerLocatorImpl addClusterTopologyListener(final ClusterTopologyListener listener) { topology.addClusterTopologyListener(listener); + return this; } public void removeClusterTopologyListener(final ClusterTopologyListener listener) @@ -1802,8 +1852,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery { topologyArrayGuard = new String(); } - //is transient so need to create, for compatibility issues - packetDecoder = ClientPacketDecoder.INSTANCE; } private final class StaticConnector implements Serializable @@ -1951,8 +1999,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery threadPool, scheduledThreadPool, incomingInterceptors, - outgoingInterceptors, - packetDecoder); + outgoingInterceptors); factory.disableFinalizeCheck(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java index 40c117b..4d8c258 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java @@ -18,8 +18,8 @@ import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; -import org.hornetq.core.protocol.core.impl.PacketDecoder; import org.hornetq.api.core.Pair; +import org.hornetq.spi.core.remoting.ClientProtocolManager; /** * A ServerLocatorInternal @@ -36,14 +36,14 @@ public interface ServerLocatorInternal extends ServerLocator AfterConnectInternalListener getAfterConnectInternalListener(); - void setAfterConnectionInternalListener(AfterConnectInternalListener listener); + ServerLocatorInternal setAfterConnectionInternalListener(AfterConnectInternalListener listener); /** Used to better identify Cluster Connection Locators on logs. To facilitate eventual debugging. * * This method used to be on tests interface, but I'm now making it part of the public interface since*/ - void setIdentity(String identity); + ServerLocatorInternal setIdentity(String identity); - void setNodeID(String nodeID); + ServerLocatorInternal setNodeID(String nodeID); String getNodeID(); @@ -70,17 +70,17 @@ public interface ServerLocatorInternal extends ServerLocator */ void notifyNodeDown(long uniqueEventID, String nodeID); - void setClusterConnection(boolean clusterConnection); + ServerLocatorInternal setClusterConnection(boolean clusterConnection); boolean isClusterConnection(); TransportConfiguration getClusterTransportConfiguration(); - void setClusterTransportConfiguration(TransportConfiguration tc); + ServerLocatorInternal setClusterTransportConfiguration(TransportConfiguration tc); Topology getTopology(); - void setPacketDecoder(PacketDecoder instance); + ClientProtocolManager newProtocolManager(); boolean isConnectable(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java index 898d194..22f6b90 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/client/impl/Topology.java @@ -156,8 +156,11 @@ public final class Topology implements Serializable TopologyMemberImpl currentMember = getMember(nodeId); if (currentMember == null) { - HornetQClientLogger.LOGGER.debug("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, - new Exception("trace")); + if (HornetQClientLogger.LOGGER.isTraceEnabled()) + { + HornetQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, + new Exception("trace")); + } currentMember = memberInput; topology.put(nodeId, currentMember); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java b/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java index c0981a3..43beff4 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/cluster/DiscoveryGroup.java @@ -26,7 +26,7 @@ import org.hornetq.api.core.HornetQBuffers; import org.hornetq.api.core.HornetQInterruptedException; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.server.HornetQComponent; import org.hornetq.core.server.management.Notification; @@ -118,7 +118,7 @@ public final class DiscoveryGroup implements HornetQComponent props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name)); - Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props); + Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STARTED, props); notificationService.sendNotification(notification); } @@ -170,7 +170,7 @@ public final class DiscoveryGroup implements HornetQComponent { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name)); - Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STOPPED, props); + Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props); try { notificationService.sendNotification(notification); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java index cc9a390..057dcf6 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageImpl.java @@ -26,6 +26,7 @@ import org.hornetq.api.core.SimpleString; import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer; import org.hornetq.core.message.BodyEncoder; import org.hornetq.core.protocol.core.impl.PacketImpl; +import org.hornetq.utils.ByteUtil; import org.hornetq.utils.DataConstants; import org.hornetq.utils.TypedProperties; import org.hornetq.utils.UUID; @@ -45,7 +46,9 @@ public abstract class MessageImpl implements MessageInternal { public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO"); - public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_HQ_ROUTE_TO_ACK"); + public static final SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_HQ_SCALEDOWN_TO"); + + public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_HQ_ACK_ROUTE_TO"); // used by the bridges to set duplicates public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_HQ_BRIDGE_DUP"); @@ -268,6 +271,20 @@ public abstract class MessageImpl implements MessageInternal return bodyBuffer; } + public Message writeBodyBufferBytes(byte[] bytes) + { + getBodyBuffer().writeBytes(bytes); + + return this; + } + + public Message writeBodyBufferString(String string) + { + getBodyBuffer().writeString(string); + + return this; + } + public void checkCompletion() throws HornetQException { // no op on regular messages @@ -295,9 +312,10 @@ public abstract class MessageImpl implements MessageInternal return userID; } - public void setUserID(final UUID userID) + public MessageImpl setUserID(final UUID userID) { this.userID = userID; + return this; } /** @@ -345,7 +363,7 @@ public abstract class MessageImpl implements MessageInternal return durable; } - public void setDurable(final boolean durable) + public MessageImpl setDurable(final boolean durable) { if (this.durable != durable) { @@ -353,6 +371,7 @@ public abstract class MessageImpl implements MessageInternal bufferValid = false; } + return this; } public long getExpiration() @@ -360,7 +379,7 @@ public abstract class MessageImpl implements MessageInternal return expiration; } - public void setExpiration(final long expiration) + public MessageImpl setExpiration(final long expiration) { if (this.expiration != expiration) { @@ -368,6 +387,7 @@ public abstract class MessageImpl implements MessageInternal bufferValid = false; } + return this; } public long getTimestamp() @@ -375,7 +395,7 @@ public abstract class MessageImpl implements MessageInternal return timestamp; } - public void setTimestamp(final long timestamp) + public MessageImpl setTimestamp(final long timestamp) { if (this.timestamp != timestamp) { @@ -383,6 +403,7 @@ public abstract class MessageImpl implements MessageInternal bufferValid = false; } + return this; } public byte getPriority() @@ -390,7 +411,7 @@ public abstract class MessageImpl implements MessageInternal return priority; } - public void setPriority(final byte priority) + public MessageImpl setPriority(final byte priority) { if (this.priority != priority) { @@ -398,6 +419,7 @@ public abstract class MessageImpl implements MessageInternal bufferValid = false; } + return this; } public boolean isExpired() @@ -924,6 +946,37 @@ public abstract class MessageImpl implements MessageInternal return false; } + /** + * Debug Helper!!!! + * + * I'm leaving this message here without any callers for a reason: + * During debugs it's important eventually to identify what's on the bodies, and this method will give you a good idea about them. + * Add the message.bodyToString() to the Watch variables on the debugger view and this will show up like a charm!!! + * @return + */ + public String bodyToString() + { + getEndOfBodyPosition(); + int readerIndex1 = this.buffer.readerIndex(); + buffer.readerIndex(0); + byte[] buffer1 = new byte[buffer.writerIndex()]; + buffer.readBytes(buffer1); + buffer.readerIndex(readerIndex1); + + byte[] buffer2 = null; + if (bodyBuffer != null) + { + int readerIndex2 = this.bodyBuffer.readerIndex(); + bodyBuffer.readerIndex(0); + buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()]; + bodyBuffer.readBytes(buffer2); + bodyBuffer.readerIndex(readerIndex2); + } + + return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1); + } + + @Override @@ -961,6 +1014,10 @@ public abstract class MessageImpl implements MessageInternal int bodySize = getEndOfBodyPosition(); + // Clebert: I've started sending this on encoding due to conversions between protocols + // and making sure we are not losing the buffer start position between protocols + this.endOfBodyPosition = bodySize; + // write it buffer.setInt(BUFFER_HEADER_SPACE, bodySize); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java b/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java index b49aff7..f84cf0a 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/message/impl/MessageInternal.java @@ -26,8 +26,6 @@ import org.hornetq.utils.TypedProperties; * * @author <a href="mailto:[email protected]">Tim Fox</a> * - * TODO - this can be refactored further to separate out large message specific stuff - * * */ public interface MessageInternal extends Message http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java index 86bccd6..fd043fe 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/Channel.java @@ -136,6 +136,11 @@ public interface Channel void returnBlocking(); /** + * forces any {@link org.hornetq.core.protocol.core.Channel#sendBlocking(Packet, byte)} request to return with an exception. + */ + void returnBlocking(Throwable cause); + + /** * returns the channel lock * * @return the lock http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java index 89303ef..0a6202a 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java @@ -184,11 +184,16 @@ public final class ChannelImpl implements Channel public void returnBlocking() { + returnBlocking(null); + } + + public void returnBlocking(Throwable cause) + { lock.lock(); try { - response = new HornetQExceptionMessage(HornetQClientMessageBundle.BUNDLE.unblockingACall()); + response = new HornetQExceptionMessage(HornetQClientMessageBundle.BUNDLE.unblockingACall(cause)); sendCondition.signal(); }
