http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/TextMessageUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/TextMessageUtil.java b/hornetq-core-client/src/main/java/org/hornetq/reader/TextMessageUtil.java new file mode 100644 index 0000000..9601f5e --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/reader/TextMessageUtil.java @@ -0,0 +1,47 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.reader; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.Message; +import org.hornetq.api.core.SimpleString; + +/** + * @author Clebert Suconic + */ + +public class TextMessageUtil extends MessageUtil +{ + + /** + * Utility method to set the Text message on a message body + */ + public static void writeBodyText(Message message, SimpleString text) + { + HornetQBuffer buff = getBodyBuffer(message); + buff.clear(); + buff.writeNullableSimpleString(text); + } + + /** + * Utility method to set the Text message on a message body + */ + public static SimpleString readBodyText(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + buff.resetReaderIndex(); + return buff.readNullableSimpleString(); + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/reader/package-info.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/reader/package-info.java b/hornetq-core-client/src/main/java/org/hornetq/reader/package-info.java new file mode 100644 index 0000000..5e75719 --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/reader/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/** + * Provides reading methods for JMS like objects. + * This isolates the logic from the JMS client case you need this kind of functionality from core. + * This is also used on conversions between protocols such as AMQP and Core where + * this is done at the server's + * @author Clebert Suconic + */ +package org.hornetq.reader; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/AbstractRemotingConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/AbstractRemotingConnection.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/AbstractRemotingConnection.java new file mode 100644 index 0000000..f5a910e --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/AbstractRemotingConnection.java @@ -0,0 +1,219 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.spi.core.protocol; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.HornetQInterruptedException; +import org.hornetq.core.client.HornetQClientLogger; +import org.hornetq.core.client.HornetQClientMessageBundle; +import org.hornetq.core.remoting.CloseListener; +import org.hornetq.core.remoting.FailureListener; +import org.hornetq.spi.core.remoting.Connection; + +/** + * @author Clebert Suconic + */ + +public abstract class AbstractRemotingConnection implements RemotingConnection +{ + protected final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>(); + protected final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>(); + protected final Connection transportConnection; + protected final Executor executor; + protected final long creationTime; + protected volatile boolean dataReceived; + + public AbstractRemotingConnection(final Connection transportConnection, final Executor executor) + { + this.transportConnection = transportConnection; + this.executor = executor; + this.creationTime = System.currentTimeMillis(); + } + + public List<FailureListener> getFailureListeners() + { + return new ArrayList<FailureListener>(failureListeners); + } + + protected void callFailureListeners(final HornetQException me, String scaleDownTargetNodeID) + { + final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners); + + for (final FailureListener listener : listenersClone) + { + try + { + listener.connectionFailed(me, false, scaleDownTargetNodeID); + } + catch (HornetQInterruptedException interrupted) + { + // this is an expected behaviour.. no warn or error here + HornetQClientLogger.LOGGER.debug("thread interrupted", interrupted); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + HornetQClientLogger.LOGGER.errorCallingFailureListener(t); + } + } + } + + + protected void callClosingListeners() + { + final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners); + + for (final CloseListener listener : listenersClone) + { + try + { + listener.connectionClosed(); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + HornetQClientLogger.LOGGER.errorCallingFailureListener(t); + } + } + } + + public void setFailureListeners(final List<FailureListener> listeners) + { + failureListeners.clear(); + + failureListeners.addAll(listeners); + } + + public Object getID() + { + return transportConnection.getID(); + } + + public String getRemoteAddress() + { + return transportConnection.getRemoteAddress(); + } + + public void addFailureListener(final FailureListener listener) + { + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull(); + } + failureListeners.add(listener); + } + + public boolean removeFailureListener(final FailureListener listener) + { + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull(); + } + + return failureListeners.remove(listener); + } + + public void addCloseListener(final CloseListener listener) + { + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull(); + } + + closeListeners.add(listener); + } + + public boolean removeCloseListener(final CloseListener listener) + { + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull(); + } + + return closeListeners.remove(listener); + } + + public List<CloseListener> removeCloseListeners() + { + List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners); + + closeListeners.clear(); + + return ret; + } + + public List<FailureListener> removeFailureListeners() + { + List<FailureListener> ret = getFailureListeners(); + + failureListeners.clear(); + + return ret; + } + + public void setCloseListeners(List<CloseListener> listeners) + { + closeListeners.clear(); + + closeListeners.addAll(listeners); + } + + public HornetQBuffer createBuffer(final int size) + { + return transportConnection.createBuffer(size); + } + + public Connection getTransportConnection() + { + return transportConnection; + } + + public long getCreationTime() + { + return creationTime; + } + + public boolean checkDataReceived() + { + boolean res = dataReceived; + + dataReceived = false; + + return res; + } + + /* + * This can be called concurrently by more than one thread so needs to be locked + */ + public void fail(final HornetQException me) + { + fail(me, null); + } + + public void bufferReceived(final Object connectionID, final HornetQBuffer buffer) + { + dataReceived = true; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java index 1d1219f..5b60aac 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/ConnectionEntry.java @@ -32,6 +32,11 @@ public class ConnectionEntry public final Executor connectionExecutor; + public Object getID() + { + return connection.getID(); + } + public ConnectionEntry(final RemotingConnection connection, final Executor connectionExecutor, final long lastCheck, final long ttl) { this.connection = connection; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java index 7e4114f..932f771 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java @@ -24,6 +24,10 @@ import org.hornetq.spi.core.remoting.Connection; /** * A RemotingConnection is a connection between a client and a server. * + * + * Perhaps a better name for this class now would be ProtocolConnection as this + * represents the link with the used protocol + * * @author <a href="mailto:[email protected]">Tim Fox</a> * @author <a href="mailto:[email protected]">Jeff Mesnil</a> */ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java index db552a2..e56f1ac 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/AbstractConnector.java @@ -14,8 +14,6 @@ package org.hornetq.spi.core.remoting; import java.util.Map; -import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManagerFactory; - /** * Abstract connector * @@ -25,16 +23,8 @@ public abstract class AbstractConnector implements Connector { protected final Map<String, Object> configuration; - private static final ClientProtocolManagerFactory protocolWrapperFactory = new HornetQClientProtocolManagerFactory(); - protected AbstractConnector(Map<String, Object> configuration) { this.configuration = configuration; } - - public ClientProtocolManagerFactory getProtocolManagerFactory() - { - return protocolWrapperFactory; - } - } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java index dc614b4..8ae772b 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManager.java @@ -16,8 +16,10 @@ package org.hornetq.spi.core.remoting; import java.util.List; import java.util.concurrent.locks.Lock; +import io.netty.channel.ChannelPipeline; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.Interceptor; +import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.spi.core.protocol.RemotingConnection; /** @@ -29,12 +31,10 @@ public interface ClientProtocolManager /// Life Cycle Methods: - RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, ProtocolResponseHandler protocolResponseHandler); + RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TopologyResponseHandler topologyResponseHandler); RemotingConnection getCurrentConnection(); - void setResponseHandler(ProtocolResponseHandler handler); - Lock lockSessionCreation(); boolean waitOnLatch(long milliseconds) throws InterruptedException; @@ -49,11 +49,10 @@ public interface ClientProtocolManager /// Sending methods + void addChannelHandlers(ChannelPipeline pipeline); void sendSubscribeTopology(boolean isServer); - void shakeHands(); - void ping(long connectionTTL); SessionContext createSessionContext(final String name, @@ -66,7 +65,14 @@ public interface ClientProtocolManager int minLargeMessageSize, int confirmationWindowSize) throws HornetQException; - boolean cleanupBeforeFailover(); + boolean cleanupBeforeFailover(HornetQException cause); boolean checkForFailover(String liveNodeID) throws HornetQException; + + void setSessionFactory(ClientSessionFactory factory); + + ClientSessionFactory getSessionFactory(); + + String getName(); + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java index 0c40e86..448f92d 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ClientProtocolManagerFactory.java @@ -12,12 +12,13 @@ */ package org.hornetq.spi.core.remoting; -import org.hornetq.core.client.impl.ClientSessionFactoryInternal; +import java.io.Serializable; /** * @author Clebert Suconic */ -public interface ClientProtocolManagerFactory +public interface ClientProtocolManagerFactory extends Serializable { - ClientProtocolManager newProtocolManager(ClientSessionFactoryInternal factoryInternal); + + ClientProtocolManager newProtocolManager(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java index 1df470b..69c338b 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connection.java @@ -12,14 +12,17 @@ */ package org.hornetq.spi.core.remoting; +import io.netty.channel.ChannelFutureListener; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.core.security.HornetQPrincipal; +import org.hornetq.spi.core.protocol.RemotingConnection; /** * The connection used by a channel to write data to. * * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author Clebert Suconic */ public interface Connection { @@ -31,6 +34,11 @@ public interface Connection */ HornetQBuffer createBuffer(int size); + + RemotingConnection getProtocolConnection(); + + void setProtocolConnection(RemotingConnection connection); + /** * returns the unique id of this wire. * @@ -48,12 +56,29 @@ public interface Connection void write(HornetQBuffer buffer, boolean flush, boolean batched); /** + * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection. + * + * @param buffer the buffer to write + * @param flush whether to flush the buffers onto the wire + * @param batched whether the packet is allowed to batched for better performance + */ + void write(HornetQBuffer buffer, boolean flush, boolean batched, ChannelFutureListener futureListener); + + /** * writes the buffer to the connection with no flushing or batching * * @param buffer the buffer to write */ void write(HornetQBuffer buffer); + + /** + * This should close the internal channel without calling any listeners. + * This is to avoid a situation where the broker is busy writing on an internal thread. + * This should close the socket releasing any pending threads. + */ + void forceClose(); + /** * Closes the connection. */ @@ -77,9 +102,16 @@ public interface Connection /** * Generates a {@link TransportConfiguration} to be used to connect to the same target this is * connected to. - * @return TranportConfiguration + * @return TransportConfiguration */ TransportConfiguration getConnectorConfig(); HornetQPrincipal getDefaultHornetQPrincipal(); + + /** + * the InVM Connection has some special handling as it doesn't use Netty ProtocolChannel + * we will use this method Instead of using instanceof + * @return + */ + boolean isUsingProtocolHandling(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java index 99dfa43..474f189 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/Connector.java @@ -50,8 +50,6 @@ public interface Connector */ Connection createConnection(); - ClientProtocolManagerFactory getProtocolManagerFactory(); - /** * If the configuration is equivalent to this connector, which means * if the parameter configuration is used to create a connection to a target http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java index d77b107..5d0b458 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConnectorFactory.java @@ -17,6 +17,8 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import org.hornetq.api.core.TransportConfigurationHelper; + /** * A ConnectorFactory is used by the client for creating connectors. * <p> @@ -24,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService; * * @author <a href="mailto:[email protected]">Tim Fox</a> */ -public interface ConnectorFactory +public interface ConnectorFactory extends TransportConfigurationHelper { /** * creates a new instance of a connector. @@ -42,7 +44,8 @@ public interface ConnectorFactory ConnectionLifeCycleListener listener, Executor closeExecutor, Executor threadPool, - ScheduledExecutorService scheduledThreadPool); + ScheduledExecutorService scheduledThreadPool, + ClientProtocolManager protocolManager); /** * Returns the allowable properties for this connector. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConsumerContext.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConsumerContext.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConsumerContext.java new file mode 100644 index 0000000..082bcd4 --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ConsumerContext.java @@ -0,0 +1,22 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.spi.core.remoting; + +/** + * @author Clebert Suconic + */ + +public abstract class ConsumerContext +{ +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java deleted file mode 100644 index e5a7c91..0000000 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/ProtocolResponseHandler.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.spi.core.remoting; - -import org.hornetq.api.core.Pair; -import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.spi.core.protocol.RemotingConnection; - -/** - * @author Clebert Suconic - */ - -public interface ProtocolResponseHandler -{ - // This is sent when the server is telling the client the node is being disconnected - void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID); - - void notifyNodeUp(long uniqueEventID, - final String backupGroupName, - final String scaleDownGroupName, - final String nodeName, - final Pair<TransportConfiguration, TransportConfiguration> connectorPair, - final boolean isLast); - - // This is sent when any node on the cluster topology is going down - void notifyNodeDown(final long eventTime, final String nodeID); -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java index af6e2f0..87d97de 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/SessionContext.java @@ -27,6 +27,7 @@ import org.hornetq.api.core.client.SendAcknowledgementHandler; import org.hornetq.core.client.impl.ClientConsumerInternal; import org.hornetq.core.client.impl.ClientLargeMessageInternal; import org.hornetq.core.client.impl.ClientMessageInternal; +import org.hornetq.core.client.impl.ClientProducerCreditsImpl; import org.hornetq.core.client.impl.ClientSessionInternal; import org.hornetq.core.message.impl.MessageInternal; import org.hornetq.spi.core.protocol.RemotingConnection; @@ -41,8 +42,6 @@ public abstract class SessionContext { protected ClientSessionInternal session; - protected final String name; - protected SendAcknowledgementHandler sendAckHandler; protected volatile RemotingConnection remotingConnection; @@ -50,10 +49,9 @@ public abstract class SessionContext protected final IDGenerator idGenerator = new SimpleIDGenerator(0); - public SessionContext(final String name, RemotingConnection remotingConnection) + public SessionContext(RemotingConnection remotingConnection) { this.remotingConnection = remotingConnection; - this.name = name; } @@ -88,7 +86,7 @@ public abstract class SessionContext public abstract boolean supportsLargeMessage(); - protected void handleReceiveLargeMessage(long consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception + protected void handleReceiveLargeMessage(ConsumerContext consumerID, ClientLargeMessageInternal clientLargeMessage, long largeMessageSize) throws Exception { ClientSessionInternal session = this.session; if (session != null) @@ -97,7 +95,7 @@ public abstract class SessionContext } } - protected void handleReceiveMessage(final long consumerID, final ClientMessageInternal message) throws Exception + protected void handleReceiveMessage(ConsumerContext consumerID, final ClientMessageInternal message) throws Exception { ClientSessionInternal session = this.session; @@ -107,7 +105,7 @@ public abstract class SessionContext } } - protected void handleReceiveContinuation(final long consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception + protected void handleReceiveContinuation(final ConsumerContext consumerID, byte[] chunk, int flowControlSize, boolean isContinues) throws Exception { ClientSessionInternal session = this.session; if (session != null) @@ -138,7 +136,7 @@ public abstract class SessionContext public abstract int getCreditsOnSendingFull(MessageInternal msgI); - public abstract void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler) throws HornetQException; + public abstract void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws HornetQException; /** * it should return the number of credits (or bytes) used to send this packet @@ -252,7 +250,7 @@ public abstract class SessionContext /** * Interrupt and return any blocked calls */ - public abstract void returnBlocking(); + public abstract void returnBlocking(HornetQException cause); /** * it will lock the communication channel of the session avoiding anything to come while failover is happening. @@ -266,4 +264,5 @@ public abstract class SessionContext public abstract void cleanup(); + public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/TopologyResponseHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/TopologyResponseHandler.java b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/TopologyResponseHandler.java new file mode 100644 index 0000000..f7b99ec --- /dev/null +++ b/hornetq-core-client/src/main/java/org/hornetq/spi/core/remoting/TopologyResponseHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.spi.core.remoting; + +import org.hornetq.api.core.Pair; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.spi.core.protocol.RemotingConnection; + +/** + * @author Clebert Suconic + */ + +public interface TopologyResponseHandler +{ + // This is sent when the server is telling the client the node is being disconnected + void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID); + + void notifyNodeUp(long uniqueEventID, + final String backupGroupName, + final String scaleDownGroupName, + final String nodeName, + final Pair<TransportConfiguration, TransportConfiguration> connectorPair, + final boolean isLast); + + // This is sent when any node on the cluster topology is going down + void notifyNodeDown(final long eventTime, final String nodeID); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java ---------------------------------------------------------------------- diff --git a/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java b/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java index 70facaf..51a32a0 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java +++ b/hornetq-core-client/src/main/java/org/hornetq/utils/FutureLatch.java @@ -22,7 +22,19 @@ import java.util.concurrent.TimeUnit; */ public class FutureLatch implements Runnable { - private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch; + + public FutureLatch() + { + super(); + latch = new CountDownLatch(1); + } + + public FutureLatch(int latches) + { + super(); + latch = new CountDownLatch(latches); + } public boolean await(final long timeout) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/pom.xml ---------------------------------------------------------------------- diff --git a/hornetq-dto/pom.xml b/hornetq-dto/pom.xml new file mode 100644 index 0000000..ed253c8 --- /dev/null +++ b/hornetq-dto/pom.xml @@ -0,0 +1,147 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-pom</artifactId> + <version>2.5.0-SNAPSHOT</version> + </parent> + + <artifactId>hornetq-dto</artifactId> + <packaging>jar</packaging> + <name>HornetQ DTO</name> + + <properties> + <hornetq.basedir>${project.basedir}/..</hornetq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson-databind.version}</version> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>target/schema</directory> + <includes> + <include>**/*</include> + </includes> + </resource> + <resource> + <directory>src/main/resources</directory> + <includes> + <include>**/*</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + + <plugins> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <phase>generate-resources</phase> + <configuration> + <tasks> + <taskdef name="schemagen" classname="com.sun.tools.jxc.SchemaGenTask"/> + <mkdir dir="${project.build.directory}/schema/org/hornetq/dto"/> + <echo message="Generating XSD to: ${project.build.directory}/schema/org/hornetq/dto"/> + <schemagen srcdir="${basedir}/.." destdir="${project.build.directory}/schema/org/hornetq/dto" + includeantruntime="false"> + <schema namespace="http://hornetq.org/schema" file="hornetq.xsd"/> + <classpath refid="maven.compile.classpath"/> + <include name="**/package-info.java"/> + <include name="**/*DTO.java"/> + <exclude name="**/.git/**"/> + <exclude name="**/.svn/**"/> + </schemagen> + <copy todir="${project.build.directory}/classes"> + <fileset dir="${project.build.directory}/schema"/> + </copy> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>javax.xml.bind</groupId> + <artifactId>jaxb-api</artifactId> + <version>2.2.7</version> + </dependency> + <dependency> + <groupId>com.sun.xml.bind</groupId> + <artifactId>jaxb-impl</artifactId> + <version>2.2.7</version> + </dependency> + <dependency> + <groupId>com.sun.xml.bind</groupId> + <artifactId>jaxb-jxc</artifactId> + <version>2.2.7</version> + </dependency> + </dependencies> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>jdk-1.5</id> + <activation> + <jdk>1.5</jdk> + </activation> + <dependencies> + <dependency> + <groupId>javax.xml.bind</groupId> + <artifactId>jaxb-api</artifactId> + <version>${jaxb-api-version}</version> + </dependency> + <dependency> + <groupId>com.sun.xml.bind</groupId> + <artifactId>jaxb-impl</artifactId> + <version>${jaxb-version}</version> + </dependency> + </dependencies> + </profile> + + <profile> + <id>ibmjdk</id> + <activation> + <file> + <exists>${java.home}/../lib/tools.jar</exists> + </file> + </activation> + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <dependencies> + <dependency> + <groupId>com.sun</groupId> + <artifactId>tools</artifactId> + <!--the real JDK version could be 1.5 or 1.6--> + <version>1.5.0</version> + <scope>system</scope> + <optional>true</optional> + <systemPath>${java.home}/../lib/tools.jar</systemPath> + </dependency> + </dependencies> + </plugin> + </plugins> + </pluginManagement> + </build> + </profile> + </profiles> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/BasicSecurityDTO.java ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/BasicSecurityDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/BasicSecurityDTO.java new file mode 100644 index 0000000..d9836b5 --- /dev/null +++ b/hornetq-dto/src/main/java/org/hornetq/dto/BasicSecurityDTO.java @@ -0,0 +1,23 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.dto; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "basic-security") +@XmlAccessorType(XmlAccessType.FIELD) +public class BasicSecurityDTO extends SecurityDTO +{ +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/BrokerDTO.java ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/BrokerDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/BrokerDTO.java new file mode 100644 index 0000000..9c31e80 --- /dev/null +++ b/hornetq-dto/src/main/java/org/hornetq/dto/BrokerDTO.java @@ -0,0 +1,40 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.dto; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElementRef; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "broker") +@XmlAccessorType(XmlAccessType.FIELD) +@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, include = JsonTypeInfo.As.PROPERTY, property = "@class") +public class BrokerDTO +{ + + @XmlElementRef + public CoreDTO core; + + @XmlElementRef(required = false) + public JmsDTO jms; + + @XmlElementRef + public SecurityDTO security; + + @XmlElementRef + public NamingDTO naming; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/CoreDTO.java ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/CoreDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/CoreDTO.java new file mode 100644 index 0000000..7cd62a8 --- /dev/null +++ b/hornetq-dto/src/main/java/org/hornetq/dto/CoreDTO.java @@ -0,0 +1,28 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.dto; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "core") +@XmlAccessorType(XmlAccessType.FIELD) +public class CoreDTO +{ + + @XmlAttribute + public String configuration; + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/JmsDTO.java ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/JmsDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/JmsDTO.java new file mode 100644 index 0000000..6990298 --- /dev/null +++ b/hornetq-dto/src/main/java/org/hornetq/dto/JmsDTO.java @@ -0,0 +1,29 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.dto; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "jms") +@XmlAccessorType(XmlAccessType.FIELD) +public class JmsDTO +{ + + @XmlAttribute + public String configuration; + +} + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/NamingDTO.java ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/NamingDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/NamingDTO.java new file mode 100644 index 0000000..8e614ae --- /dev/null +++ b/hornetq-dto/src/main/java/org/hornetq/dto/NamingDTO.java @@ -0,0 +1,35 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.dto; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "naming") +@XmlAccessorType(XmlAccessType.FIELD) +public class NamingDTO +{ + @XmlAttribute + public String bindAddress = "localhost"; + + @XmlAttribute + public int port = 1099; + + @XmlAttribute + public String rmiBindAddress = "localhost"; + + @XmlAttribute + public int rmiPort = 1098; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/SecurityDTO.java ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/SecurityDTO.java b/hornetq-dto/src/main/java/org/hornetq/dto/SecurityDTO.java new file mode 100644 index 0000000..bf52ae3 --- /dev/null +++ b/hornetq-dto/src/main/java/org/hornetq/dto/SecurityDTO.java @@ -0,0 +1,26 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.dto; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlType; + +@XmlType(name = "security") +@XmlAccessorType(XmlAccessType.FIELD) +@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, include = JsonTypeInfo.As.PROPERTY, property = "@class") +public class SecurityDTO +{ +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/XmlUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/XmlUtil.java b/hornetq-dto/src/main/java/org/hornetq/dto/XmlUtil.java new file mode 100644 index 0000000..58037ff --- /dev/null +++ b/hornetq-dto/src/main/java/org/hornetq/dto/XmlUtil.java @@ -0,0 +1,106 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.dto; + +import javax.xml.XMLConstants; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.util.StreamReaderDelegate; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class XmlUtil +{ + + /** + * Changes ${property} with values from a properties object + */ + static class PropertiesFilter extends StreamReaderDelegate + { + + static final Pattern pattern = Pattern.compile("\\$\\{([^\\}]+)\\}"); + private final Properties props; + + public PropertiesFilter(XMLStreamReader parent, Properties props) + { + super(parent); + this.props = props; + } + + public String getAttributeValue(int index) + { + return filter(super.getAttributeValue(index)); + } + + public String filter(String str) + { + int start = 0; + while (true) + { + Matcher matcher = pattern.matcher(str); + if (!matcher.find(start)) + { + break; + } + String group = matcher.group(1); + String property = props.getProperty(group); + if (property != null) + { + str = matcher.replaceFirst(Matcher.quoteReplacement(property)); + } + else + { + start = matcher.end(); + } + } + return str; + } + + } + + private static final XMLInputFactory factory = XMLInputFactory.newInstance(); + + public static <T> T decode(Class<T> clazz, File configuration) throws Exception + { + JAXBContext jaxbContext = JAXBContext.newInstance("org.hornetq.dto"); + + Unmarshaller unmarshaller = jaxbContext.createUnmarshaller(); + SchemaFactory sf = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + sf.setFeature("http://apache.org/xml/features/validation/schema-full-checking", false); + InputStream xsdStream = XmlUtil.class.getClassLoader().getResourceAsStream("org/hornetq/dto/hornetq.xsd"); + StreamSource xsdSource = new StreamSource(xsdStream); + Schema schema = sf.newSchema(xsdSource); + unmarshaller.setSchema(schema); + + XMLStreamReader reader = factory.createXMLStreamReader(new FileInputStream(configuration)); + //TODO - support properties files + Properties props = System.getProperties(); + + if (props != null) + { + reader = new PropertiesFilter(reader, props); + } + + return clazz.cast(unmarshaller.unmarshal(reader)); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/java/org/hornetq/dto/package-info.java ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/java/org/hornetq/dto/package-info.java b/hornetq-dto/src/main/java/org/hornetq/dto/package-info.java new file mode 100644 index 0000000..4da6f69 --- /dev/null +++ b/hornetq-dto/src/main/java/org/hornetq/dto/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * The JAXB POJOs for the XML configuration of HornetQ broker + */ [email protected]( + namespace = "http://hornetq.org/schema", + elementFormDefault = javax.xml.bind.annotation.XmlNsForm.QUALIFIED) +package org.hornetq.dto; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-dto/src/main/resources/org/hornetq/dto/jaxb.index ---------------------------------------------------------------------- diff --git a/hornetq-dto/src/main/resources/org/hornetq/dto/jaxb.index b/hornetq-dto/src/main/resources/org/hornetq/dto/jaxb.index new file mode 100644 index 0000000..3cec94b --- /dev/null +++ b/hornetq-dto/src/main/resources/org/hornetq/dto/jaxb.index @@ -0,0 +1,19 @@ +## --------------------------------------------------------------------------- +## Copyright 2005-2014 Red Hat, Inc. +## Red Hat licenses this file to you under the Apache License, version +## 2.0 (the "License"); you may not use this file except in compliance +## with the License. You may obtain a copy of the License at +## http://www.apache.org/licenses/LICENSE-2.0 +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +## implied. See the License for the specific language governing +## permissions and limitations under the License. +## --------------------------------------------------------------------------- +BrokerDTO +CoreDTO +JmsDTO +SecurityDTO +BasicSecurityDTO +NamingDTO + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java b/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java index b672ab7..ae7cba1 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSQueueControl.java @@ -277,4 +277,12 @@ public interface JMSQueueControl extends DestinationControl @Operation(desc = "List all the existent consumers on the Queue") String listConsumersAsJSON() throws Exception; + /** + * it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call + * any other measure. + * It is useful if you need the exact number of counts on a message + * @throws Exception + */ + void flushExecutor(); + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java b/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java index d0d7f6c..f58692e 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/api/jms/management/JMSServerControl.java @@ -268,6 +268,18 @@ public interface JMSServerControl boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress) throws Exception; /** + * Closes all the connections on this server for consumers which are consuming from a queue associated with a particular address. + */ + @Operation(desc = "Closes all the consumer connections for the given HornetQ address", impact = MBeanOperationInfo.INFO) + boolean closeConsumerConnectionsForAddress(@Parameter(desc = "a HornetQ address", name = "address") String address) throws Exception; + + /** + * Closes all the connections on this server for sessions using a particular user name. + */ + @Operation(desc = "Closes all the connections for session using a particular user name", impact = MBeanOperationInfo.INFO) + boolean closeConnectionsForUser(@Parameter(desc = "a user name", name = "userName") String address) throws Exception; + + /** * Lists all the IDs of the connections connected to this server. */ @Operation(desc = "List all the connection IDs", impact = MBeanOperationInfo.INFO) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java index 55a55dc..2cf4154 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQBytesMessage.java @@ -24,6 +24,31 @@ import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientSession; import org.hornetq.core.message.impl.MessageImpl; +import static org.hornetq.reader.BytesMessageUtil.bytesMessageReset; +import static org.hornetq.reader.BytesMessageUtil.bytesReadBoolean; +import static org.hornetq.reader.BytesMessageUtil.bytesReadByte; +import static org.hornetq.reader.BytesMessageUtil.bytesReadBytes; +import static org.hornetq.reader.BytesMessageUtil.bytesReadChar; +import static org.hornetq.reader.BytesMessageUtil.bytesReadDouble; +import static org.hornetq.reader.BytesMessageUtil.bytesReadFloat; +import static org.hornetq.reader.BytesMessageUtil.bytesReadInt; +import static org.hornetq.reader.BytesMessageUtil.bytesReadLong; +import static org.hornetq.reader.BytesMessageUtil.bytesReadShort; +import static org.hornetq.reader.BytesMessageUtil.bytesReadUTF; +import static org.hornetq.reader.BytesMessageUtil.bytesReadUnsignedByte; +import static org.hornetq.reader.BytesMessageUtil.bytesReadUnsignedShort; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteBoolean; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteByte; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteBytes; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteChar; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteDouble; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteFloat; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteInt; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteLong; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteObject; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteShort; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteUTF; + /** * HornetQ implementation of a JMS {@link BytesMessage}. * @@ -85,7 +110,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return getBuffer().readBoolean(); + return bytesReadBoolean(message); } catch (IndexOutOfBoundsException e) { @@ -98,7 +123,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return getBuffer().readByte(); + return bytesReadByte(message); } catch (IndexOutOfBoundsException e) { @@ -111,7 +136,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return getBuffer().readUnsignedByte(); + return bytesReadUnsignedByte(message); } catch (IndexOutOfBoundsException e) { @@ -124,7 +149,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return getBuffer().readShort(); + return bytesReadShort(message); } catch (IndexOutOfBoundsException e) { @@ -137,7 +162,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return getBuffer().readUnsignedShort(); + return bytesReadUnsignedShort(message); } catch (IndexOutOfBoundsException e) { @@ -150,7 +175,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return (char)getBuffer().readShort(); + return bytesReadChar(message); } catch (IndexOutOfBoundsException e) { @@ -163,7 +188,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return getBuffer().readInt(); + return bytesReadInt(message); } catch (IndexOutOfBoundsException e) { @@ -176,7 +201,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return getBuffer().readLong(); + return bytesReadLong(message); } catch (IndexOutOfBoundsException e) { @@ -189,7 +214,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return Float.intBitsToFloat(getBuffer().readInt()); + return bytesReadFloat(message); } catch (IndexOutOfBoundsException e) { @@ -202,7 +227,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return Double.longBitsToDouble(getBuffer().readLong()); + return bytesReadDouble(message); } catch (IndexOutOfBoundsException e) { @@ -215,7 +240,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkRead(); try { - return getBuffer().readUTF(); + return bytesReadUTF(message); } catch (IndexOutOfBoundsException e) { @@ -232,74 +257,63 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage public int readBytes(final byte[] value) throws JMSException { - return readBytes(value, value.length); + checkRead(); + return bytesReadBytes(message, value); } public int readBytes(final byte[] value, final int length) throws JMSException { checkRead(); + return bytesReadBytes(message, value, length); - if (!getBuffer().readable()) - { - return -1; - } - - int read = Math.min(length, getBuffer().readableBytes()); - - if (read != 0) - { - getBuffer().readBytes(value, 0, read); - } - - return read; } public void writeBoolean(final boolean value) throws JMSException { checkWrite(); - getBuffer().writeBoolean(value); + bytesWriteBoolean(message, value); } public void writeByte(final byte value) throws JMSException { checkWrite(); - getBuffer().writeByte(value); + bytesWriteByte(message, value); } public void writeShort(final short value) throws JMSException { checkWrite(); - getBuffer().writeShort(value); + bytesWriteShort(message, value); } public void writeChar(final char value) throws JMSException { checkWrite(); - getBuffer().writeShort((short)value); + bytesWriteChar(message, value); } public void writeInt(final int value) throws JMSException { checkWrite(); - getBuffer().writeInt(value); + bytesWriteInt(message, value); } public void writeLong(final long value) throws JMSException { checkWrite(); - getBuffer().writeLong(value); + bytesWriteLong(message, value); } public void writeFloat(final float value) throws JMSException { checkWrite(); - getBuffer().writeInt(Float.floatToIntBits(value)); + bytesWriteFloat(message, value); } public void writeDouble(final double value) throws JMSException { checkWrite(); - getBuffer().writeLong(Double.doubleToLongBits(value)); + bytesWriteDouble(message, value); } public void writeUTF(final String value) throws JMSException @@ -307,7 +321,7 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage checkWrite(); try { - getBuffer().writeUTF(value); + bytesWriteUTF(message, value); } catch (Exception e) { @@ -316,67 +330,25 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage je.initCause(e); throw je; } + } public void writeBytes(final byte[] value) throws JMSException { checkWrite(); - getBuffer().writeBytes(value); + bytesWriteBytes(message, value); } public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException { checkWrite(); - getBuffer().writeBytes(value, offset, length); + bytesWriteBytes(message, value, offset, length); } public void writeObject(final Object value) throws JMSException { - if (value == null) - { - throw new NullPointerException("Attempt to write a null value"); - } - if (value instanceof String) - { - writeUTF((String)value); - } - else if (value instanceof Boolean) - { - writeBoolean((Boolean)value); - } - else if (value instanceof Character) - { - writeChar((Character)value); - } - else if (value instanceof Byte) - { - writeByte((Byte)value); - } - else if (value instanceof Short) - { - writeShort((Short)value); - } - else if (value instanceof Integer) - { - writeInt((Integer)value); - } - else if (value instanceof Long) - { - writeLong((Long)value); - } - else if (value instanceof Float) - { - writeFloat((Float)value); - } - else if (value instanceof Double) - { - writeDouble((Double)value); - } - else if (value instanceof byte[]) - { - writeBytes((byte[])value); - } - else + checkWrite(); + if (!bytesWriteObject(message, value)) { throw new MessageFormatException("Invalid object for properties"); } @@ -389,13 +361,9 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage readOnly = true; bodyLength = message.getBodySize(); - - getBuffer().resetReaderIndex(); - } - else - { - getBuffer().resetReaderIndex(); } + + bytesMessageReset(message); } @Override @@ -407,11 +375,20 @@ public class HornetQBytesMessage extends HornetQMessage implements BytesMessage // HornetQRAMessage overrides ---------------------------------------- @Override - public void clearBody() + public void clearBody() throws JMSException { super.clearBody(); - getBuffer().clear(); + try + { + getBuffer().clear(); + } + catch (RuntimeException e) + { + JMSException e2 = new JMSException(e.getMessage()); + e2.initCause(e); + throw e2; + } } public long getBodyLength() throws JMSException http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java index 5c13b1f..5d4413b 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQConnection.java @@ -43,6 +43,7 @@ import org.hornetq.api.core.client.SessionFailureListener; import org.hornetq.api.jms.HornetQJMSConstants; import org.hornetq.core.client.impl.ClientSessionInternal; import org.hornetq.core.version.Version; +import org.hornetq.reader.MessageUtil; import org.hornetq.utils.ConcurrentHashSet; import org.hornetq.utils.UUIDGenerator; import org.hornetq.utils.VersionLoader; @@ -70,7 +71,7 @@ public class HornetQConnection extends HornetQConnectionForContextImpl implement public static final String EXCEPTION_DISCONNECT = "DISCONNECT"; - public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__HQ_CID"); + public static final SimpleString CONNECTION_ID_PROPERTY_NAME = MessageUtil.CONNECTION_ID_PROPERTY_NAME; // Static --------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java index 564da7d..d8d9cf7 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMapMessage.java @@ -28,6 +28,10 @@ import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientSession; import org.hornetq.utils.TypedProperties; + +import static org.hornetq.reader.MapMessageUtil.writeBodyMap; +import static org.hornetq.reader.MapMessageUtil.readBodyMap; + /** * HornetQ implementation of a JMS MapMessage. * @@ -46,7 +50,7 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag // Attributes ---------------------------------------------------- - private TypedProperties map = new TypedProperties(); + private final TypedProperties map = new TypedProperties(); private boolean invalid; @@ -61,8 +65,6 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag { super(HornetQMapMessage.TYPE, session); - map = new TypedProperties(); - invalid = true; } @@ -368,7 +370,7 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag // HornetQRAMessage overrides ---------------------------------------- @Override - public void clearBody() + public void clearBody() throws JMSException { super.clearBody(); @@ -382,10 +384,7 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag { if (invalid) { - message.getBodyBuffer().resetWriterIndex(); - - map.encode(message.getBodyBuffer()); - + writeBodyMap(message, map); invalid = false; } @@ -397,7 +396,7 @@ public final class HornetQMapMessage extends HornetQMessage implements MapMessag { super.doBeforeReceive(); - map.decode(message.getBodyBuffer()); + readBodyMap(message, map); } // Package protected --------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java index ac66539..16fb52e 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQMessage.java @@ -15,12 +15,10 @@ package org.hornetq.jms.client; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.OutputStream; -import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import javax.jms.DeliveryMode; @@ -40,10 +38,11 @@ import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.jms.HornetQJMSConstants; -import org.hornetq.core.client.impl.ClientMessageImpl; import org.hornetq.core.message.impl.MessageInternal; +import org.hornetq.reader.MessageUtil; import org.hornetq.utils.UUID; + /** * HornetQ implementation of a JMS Message. * <br> @@ -64,23 +63,6 @@ import org.hornetq.utils.UUID; public class HornetQMessage implements javax.jms.Message { // Constants ----------------------------------------------------- - - private static final SimpleString REPLYTO_HEADER_NAME = ClientMessageImpl.REPLYTO_HEADER_NAME; - - private static final SimpleString CORRELATIONID_HEADER_NAME = new SimpleString("JMSCorrelationID"); - - private static final SimpleString TYPE_HEADER_NAME = new SimpleString("JMSType"); - - private static final SimpleString JMS = new SimpleString("JMS"); - - private static final SimpleString JMSX = new SimpleString("JMSX"); - - private static final SimpleString JMS_ = new SimpleString("JMS_"); - - private static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount"; - - private static final String JMSXGROUPID = "JMSXGroupID"; - public static final byte TYPE = org.hornetq.api.core.Message.DEFAULT_TYPE; public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage) @@ -363,55 +345,34 @@ public class HornetQMessage implements javax.jms.Message public byte[] getJMSCorrelationIDAsBytes() throws JMSException { - Object obj = message.getObjectProperty(HornetQMessage.CORRELATIONID_HEADER_NAME); - - if (obj instanceof byte[]) - { - return (byte[])obj; - } - else - { - return null; - } + return MessageUtil.getJMSCorrelationIDAsBytes(message); } public void setJMSCorrelationIDAsBytes(final byte[] correlationID) throws JMSException { - if (correlationID == null || correlationID.length == 0) + try + { + MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID); + } + catch (HornetQException e) { - throw new JMSException("Please specify a non-zero length byte[]"); + JMSException ex = new JMSException(e.getMessage()); + ex.initCause(e); + throw ex; } - message.putBytesProperty(HornetQMessage.CORRELATIONID_HEADER_NAME, correlationID); } public void setJMSCorrelationID(final String correlationID) throws JMSException { - if (correlationID == null) - { - message.removeProperty(HornetQMessage.CORRELATIONID_HEADER_NAME); - - jmsCorrelationID = null; - } - else - { - message.putStringProperty(HornetQMessage.CORRELATIONID_HEADER_NAME, new SimpleString(correlationID)); - - jmsCorrelationID = correlationID; - } + MessageUtil.setJMSCorrelationID(message, correlationID); + jmsCorrelationID = correlationID; } public String getJMSCorrelationID() throws JMSException { if (jmsCorrelationID == null) { - try - { - jmsCorrelationID = message.getStringProperty(HornetQMessage.CORRELATIONID_HEADER_NAME); - } - catch (HornetQPropertyConversionException e) - { - jmsCorrelationID = null; - } + jmsCorrelationID = MessageUtil.getJMSCorrelationID(message); } return jmsCorrelationID; @@ -421,7 +382,8 @@ public class HornetQMessage implements javax.jms.Message { if (replyTo == null) { - SimpleString repl = message.getSimpleStringProperty(HornetQMessage.REPLYTO_HEADER_NAME); + + SimpleString repl = MessageUtil.getJMSReplyTo(message); if (repl != null) { @@ -433,10 +395,10 @@ public class HornetQMessage implements javax.jms.Message public void setJMSReplyTo(final Destination dest) throws JMSException { + if (dest == null) { - message.removeProperty(HornetQMessage.REPLYTO_HEADER_NAME); - + MessageUtil.setJMSReplyTo(message, null); replyTo = null; } else @@ -448,7 +410,7 @@ public class HornetQMessage implements javax.jms.Message HornetQDestination jbd = (HornetQDestination)dest; - message.putStringProperty(HornetQMessage.REPLYTO_HEADER_NAME, jbd.getSimpleAddress()); + MessageUtil.setJMSReplyTo(message, jbd.getSimpleAddress()); replyTo = jbd; } @@ -520,7 +482,7 @@ public class HornetQMessage implements javax.jms.Message { if (type != null) { - message.putStringProperty(HornetQMessage.TYPE_HEADER_NAME, new SimpleString(type)); + MessageUtil.setJMSType(message, type); jmsType = type; } @@ -530,12 +492,7 @@ public class HornetQMessage implements javax.jms.Message { if (jmsType == null) { - SimpleString ss = message.getSimpleStringProperty(HornetQMessage.TYPE_HEADER_NAME); - - if (ss != null) - { - jmsType = ss.toString(); - } + jmsType = MessageUtil.getJMSType(message); } return jmsType; } @@ -564,35 +521,20 @@ public class HornetQMessage implements javax.jms.Message public void clearProperties() throws JMSException { - List<SimpleString> toRemove = new ArrayList<SimpleString>(); - - for (SimpleString propName : message.getPropertyNames()) - { - if (!propName.startsWith(HornetQMessage.JMS) || propName.startsWith(HornetQMessage.JMSX) || - propName.startsWith(HornetQMessage.JMS_)) - { - toRemove.add(propName); - } - } - for (SimpleString propName : toRemove) - { - message.removeProperty(propName); - } + MessageUtil.clearProperties(message); propertiesReadOnly = false; } - public void clearBody() + public void clearBody() throws JMSException { readOnly = false; } public boolean propertyExists(final String name) throws JMSException { - return message.containsProperty(new SimpleString(name)) || name.equals(HornetQMessage.JMSXDELIVERYCOUNT) || - HornetQMessage.JMSXGROUPID.equals(name) && - message.containsProperty(org.hornetq.api.core.Message.HDR_GROUP_ID); + return MessageUtil.propertyExists(message, name); } public boolean getBooleanProperty(final String name) throws JMSException @@ -633,7 +575,7 @@ public class HornetQMessage implements javax.jms.Message public int getIntProperty(final String name) throws JMSException { - if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name)) + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { return message.getDeliveryCount(); } @@ -650,7 +592,7 @@ public class HornetQMessage implements javax.jms.Message public long getLongProperty(final String name) throws JMSException { - if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name)) + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { return message.getDeliveryCount(); } @@ -691,14 +633,14 @@ public class HornetQMessage implements javax.jms.Message public String getStringProperty(final String name) throws JMSException { - if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name)) + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { return String.valueOf(message.getDeliveryCount()); } try { - if (HornetQMessage.JMSXGROUPID.equals(name)) + if (MessageUtil.JMSXGROUPID.equals(name)) { return message.getStringProperty(org.hornetq.api.core.Message.HDR_GROUP_ID); } @@ -715,7 +657,7 @@ public class HornetQMessage implements javax.jms.Message public Object getObjectProperty(final String name) throws JMSException { - if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name)) + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) { return String.valueOf(message.getDeliveryCount()); } @@ -732,20 +674,7 @@ public class HornetQMessage implements javax.jms.Message @Override public Enumeration getPropertyNames() throws JMSException { - HashSet<String> set = new HashSet<String>(); - - for (SimpleString propName : message.getPropertyNames()) - { - if ((!propName.startsWith(HornetQMessage.JMS) || propName.startsWith(HornetQMessage.JMSX) || - propName.startsWith(HornetQMessage.JMS_)) && !propName.startsWith(HornetQConnection.CONNECTION_ID_PROPERTY_NAME)) - { - set.add(propName.toString()); - } - } - - set.add(HornetQMessage.JMSXDELIVERYCOUNT); - - return Collections.enumeration(set); + return Collections.enumeration(MessageUtil.getPropertyNames(message)); } public void setBooleanProperty(final String name, final boolean value) throws JMSException @@ -795,7 +724,7 @@ public class HornetQMessage implements javax.jms.Message { checkProperty(name); - if (HornetQMessage.JMSXGROUPID.equals(name)) + if (MessageUtil.JMSXGROUPID.equals(name)) { message.putStringProperty(org.hornetq.api.core.Message.HDR_GROUP_ID, SimpleString.toSimpleString(value)); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java index 23a49d0..ff1e452 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQObjectMessage.java @@ -164,7 +164,7 @@ public class HornetQObjectMessage extends HornetQMessage implements ObjectMessag } @Override - public void clearBody() + public void clearBody() throws JMSException { super.clearBody(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java ---------------------------------------------------------------------- diff --git a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java index b496dc0..46ec8f0 100644 --- a/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java +++ b/hornetq-jms-client/src/main/java/org/hornetq/jms/client/HornetQQueue.java @@ -60,7 +60,7 @@ public class HornetQQueue extends HornetQDestination implements Queue super(address, name, temporary, true, session); } - protected HornetQQueue(final String address, final String name) + public HornetQQueue(final String address, final String name) { super(address, name, false, true, null); }
