http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/TopologyMemberImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/TopologyMemberImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/TopologyMemberImpl.java new file mode 100644 index 0000000..a63f787 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/client/impl/TopologyMemberImpl.java @@ -0,0 +1,138 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.client.impl; + +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.TopologyMember; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * Created Aug 16, 2010 + */ +public final class TopologyMemberImpl implements TopologyMember +{ + private static final long serialVersionUID = 1123652191795626133L; + + private final Pair<TransportConfiguration, TransportConfiguration> connector; + + private final String backupGroupName; + + private final String scaleDownGroupName; + + /** + * transient to avoid serialization changes + */ + private transient long uniqueEventID = System.currentTimeMillis(); + + private final String nodeId; + + public TopologyMemberImpl(String nodeId, final String backupGroupName, final String scaleDownGroupName, final TransportConfiguration a, + final TransportConfiguration b) + { + this.nodeId = nodeId; + this.backupGroupName = backupGroupName; + this.scaleDownGroupName = scaleDownGroupName; + this.connector = new Pair<TransportConfiguration, TransportConfiguration>(a, b); + uniqueEventID = System.currentTimeMillis(); + } + + @Override + public TransportConfiguration getLive() + { + return connector.getA(); + } + + @Override + public TransportConfiguration getBackup() + { + return connector.getB(); + } + + public void setBackup(final TransportConfiguration param) + { + connector.setB(param); + } + + public void setLive(final TransportConfiguration param) + { + connector.setA(param); + } + + @Override + public String getNodeId() + { + return nodeId; + } + + @Override + public long getUniqueEventID() + { + return uniqueEventID; + } + + @Override + public String getBackupGroupName() + { + return backupGroupName; + } + + @Override + public String getScaleDownGroupName() + { + return scaleDownGroupName; + } + + /** + * @param uniqueEventID the uniqueEventID to set + */ + public void setUniqueEventID(final long uniqueEventID) + { + this.uniqueEventID = uniqueEventID; + } + + public Pair<TransportConfiguration, TransportConfiguration> getConnector() + { + return connector; + } + + + public boolean isMember(RemotingConnection connection) + { + TransportConfiguration connectorConfig = connection.getTransportConnection() != null ? connection.getTransportConnection().getConnectorConfig() : null; + + return isMember(connectorConfig); + + } + + public boolean isMember(TransportConfiguration configuration) + { + if (getConnector().getA() != null && getConnector().getA().equals(configuration) || + getConnector().getB() != null && getConnector().getB().equals(configuration)) + { + return true; + } + else + { + return false; + } + } + + + @Override + public String toString() + { + return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]"; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryEntry.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryEntry.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryEntry.java new file mode 100644 index 0000000..e5bfae4 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryEntry.java @@ -0,0 +1,58 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.cluster; + +import org.apache.activemq6.api.core.TransportConfiguration; + +/** + * A DiscoveryEntry + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + * + */ +public class DiscoveryEntry +{ + private final String nodeID; + private final TransportConfiguration connector; + private final long lastUpdate; + + + public DiscoveryEntry(final String nodeID, final TransportConfiguration connector, final long lastUpdate) + { + this.nodeID = nodeID; + this.connector = connector; + this.lastUpdate = lastUpdate; + } + + public String getNodeID() + { + return nodeID; + } + + public TransportConfiguration getConnector() + { + return connector; + } + + public long getLastUpdate() + { + return lastUpdate; + } + + @Override + public String toString() + { + return "DiscoveryEntry[nodeID=" + nodeID + ", connector=" + connector + ", lastUpdate=" + lastUpdate + "]"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryGroup.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryGroup.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryGroup.java new file mode 100644 index 0000000..fd2ae35 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryGroup.java @@ -0,0 +1,432 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.cluster; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq6.api.core.BroadcastEndpoint; +import org.apache.activemq6.api.core.BroadcastEndpointFactory; +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.management.CoreNotificationType; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.server.HornetQComponent; +import org.apache.activemq6.core.server.management.Notification; +import org.apache.activemq6.core.server.management.NotificationService; +import org.apache.activemq6.utils.TypedProperties; + +/** + * This class is used to search for members on the cluster through the opaque interface {@link BroadcastEndpoint}. + * <p> + * There are two current implementations, and that's probably all we will ever need. + * <p> + * We will probably keep both interfaces for a while as UDP is a simple solution requiring no extra dependencies which + * is suitable for users looking for embedded solutions. + * <p> + * Created 17 Nov 2008 13:21:45 + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author Clebert Suconic + */ +public final class DiscoveryGroup implements HornetQComponent +{ + private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled(); + + private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>(); + + private final String name; + + private Thread thread; + + private boolean received; + + private final Object waitLock = new Object(); + + private final Map<String, DiscoveryEntry> connectors = new ConcurrentHashMap<String, DiscoveryEntry>(); + + private final long timeout; + + private volatile boolean started; + + private final String nodeID; + + private final Map<String, String> uniqueIDMap = new HashMap<String, String>(); + + private final BroadcastEndpoint endpoint; + + private final NotificationService notificationService; + + /** + * This is the main constructor, intended to be used + * + * @param nodeID + * @param name + * @param timeout + * @param endpointFactory + * @param service + * @throws Exception + */ + public DiscoveryGroup(final String nodeID, final String name, final long timeout, + BroadcastEndpointFactory endpointFactory, + NotificationService service) throws Exception + { + this.nodeID = nodeID; + this.name = name; + this.timeout = timeout; + this.endpoint = endpointFactory.createBroadcastEndpoint(); + this.notificationService = service; + } + + public synchronized void start() throws Exception + { + if (started) + { + return; + } + + endpoint.openClient(); + + started = true; + + thread = new Thread(new DiscoveryRunnable(), "hornetq-discovery-group-thread-" + name); + + thread.setDaemon(true); + + thread.start(); + + if (notificationService != null) + { + TypedProperties props = new TypedProperties(); + + props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name)); + + Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STARTED, props); + + notificationService.sendNotification(notification); + } + } + + public void stop() + { + synchronized (this) + { + if (!started) + { + return; + } + + started = false; + } + + synchronized (waitLock) + { + waitLock.notifyAll(); + } + + try + { + endpoint.close(false); + } + catch (Exception e1) + { + HornetQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1); + } + + try + { + thread.interrupt(); + thread.join(10000); + if (thread.isAlive()) + { + HornetQClientLogger.LOGGER.timedOutStoppingDiscovery(); + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + + thread = null; + + if (notificationService != null) + { + TypedProperties props = new TypedProperties(); + props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name)); + Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props); + try + { + notificationService.sendNotification(notification); + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e); + } + } + } + + public boolean isStarted() + { + return started; + } + + public String getName() + { + return name; + } + + public synchronized List<DiscoveryEntry> getDiscoveryEntries() + { + List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>(connectors.values()); + + return list; + } + + public boolean waitForBroadcast(final long timeout) + { + synchronized (waitLock) + { + long start = System.currentTimeMillis(); + + long toWait = timeout; + + while (started && !received && (toWait > 0 || timeout == 0)) + { + try + { + waitLock.wait(toWait); + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + + if (timeout != 0) + { + long now = System.currentTimeMillis(); + + toWait -= now - start; + + start = now; + } + } + + boolean ret = received; + + received = false; + + return ret; + } + } + + /* + * This is a sanity check to catch any cases where two different nodes are broadcasting the same node id either + * due to misconfiguration or problems in failover + */ + private void checkUniqueID(final String originatingNodeID, final String uniqueID) + { + String currentUniqueID = uniqueIDMap.get(originatingNodeID); + + if (currentUniqueID == null) + { + uniqueIDMap.put(originatingNodeID, uniqueID); + } + else + { + if (!currentUniqueID.equals(uniqueID)) + { + HornetQClientLogger.LOGGER.multipleServersBroadcastingSameNode(originatingNodeID); + uniqueIDMap.put(originatingNodeID, uniqueID); + } + } + } + + class DiscoveryRunnable implements Runnable + { + public void run() + { + try + { + byte[] data = null; + + while (started) + { + try + { + + data = endpoint.receiveBroadcast(); + if (data == null) + { + if (started) + { + // This is totally unexpected, so I'm not even bothering on creating + // a log entry for that + HornetQClientLogger.LOGGER.warn("Unexpected null data received from DiscoveryEndpoint"); + } + break; + } + } + catch (Exception e) + { + if (!started) + { + return; + } + else + { + HornetQClientLogger.LOGGER.errorReceivingPAcketInDiscovery(e); + } + } + + HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(data); + + String originatingNodeID = buffer.readString(); + + String uniqueID = buffer.readString(); + + checkUniqueID(originatingNodeID, uniqueID); + + if (nodeID.equals(originatingNodeID)) + { + if (checkExpiration()) + { + callListeners(); + } + // Ignore traffic from own node + continue; + } + + int size = buffer.readInt(); + + boolean changed = false; + + DiscoveryEntry[] entriesRead = new DiscoveryEntry[size]; + // Will first decode all the elements outside of any lock + for (int i = 0; i < size; i++) + { + TransportConfiguration connector = new TransportConfiguration(); + + connector.decode(buffer); + + entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis()); + } + + synchronized (DiscoveryGroup.this) + { + for (DiscoveryEntry entry : entriesRead) + { + if (connectors.put(originatingNodeID, entry) == null) + { + changed = true; + } + } + + changed = changed || checkExpiration(); + } + //only call the listeners if we have changed + //also make sure that we aren't stopping to avoid deadlock + if (changed && started) + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Connectors changed on Discovery:"); + for (DiscoveryEntry connector : connectors.values()) + { + HornetQClientLogger.LOGGER.trace(connector); + } + } + callListeners(); + } + + synchronized (waitLock) + { + received = true; + + waitLock.notifyAll(); + } + } + } + catch (Exception e) + { + HornetQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e); + } + } + + } + + public synchronized void registerListener(final DiscoveryListener listener) + { + listeners.add(listener); + + if (!connectors.isEmpty()) + { + listener.connectorsChanged(getDiscoveryEntries()); + } + } + + public synchronized void unregisterListener(final DiscoveryListener listener) + { + listeners.remove(listener); + } + + private void callListeners() + { + for (DiscoveryListener listener : listeners) + { + try + { + listener.connectorsChanged(getDiscoveryEntries()); + } + catch (Throwable t) + { + // Catch it so exception doesn't prevent other listeners from running + HornetQClientLogger.LOGGER.failedToCallListenerInDiscovery(t); + } + } + } + + private boolean checkExpiration() + { + boolean changed = false; + long now = System.currentTimeMillis(); + + Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator(); + + // Weed out any expired connectors + + while (iter.hasNext()) + { + Map.Entry<String, DiscoveryEntry> entry = iter.next(); + + if (entry.getValue().getLastUpdate() + timeout <= now) + { + if (isTrace) + { + HornetQClientLogger.LOGGER.trace("Timed out node on discovery:" + entry.getValue()); + } + iter.remove(); + + changed = true; + } + } + + return changed; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryListener.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryListener.java new file mode 100644 index 0000000..032ba68 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/cluster/DiscoveryListener.java @@ -0,0 +1,30 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.cluster; + +import java.util.List; + +/** + * To be called any time Discovery changes its list of nodes. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author Clebert Suconic + * + * Created 17 Nov 2008 14:30:39 + * + * + */ +public interface DiscoveryListener +{ + void connectorsChanged(List<DiscoveryEntry> newConnectors); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/exception/HornetQXAException.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/exception/HornetQXAException.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/exception/HornetQXAException.java new file mode 100644 index 0000000..f72f2b2 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/exception/HornetQXAException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.exception; + +import javax.transaction.xa.XAException; + +/** + * A HornetQXAException + * + * @author Tim Fox + * + * + */ +public class HornetQXAException extends XAException +{ + private static final long serialVersionUID = 6535914602965015803L; + + public HornetQXAException(final int errorCode, final String message) + { + super(message); + + this.errorCode = errorCode; + } + + public HornetQXAException(final int errorCode) + { + super(errorCode); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/BodyEncoder.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/BodyEncoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/BodyEncoder.java new file mode 100644 index 0000000..923beb1 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/BodyEncoder.java @@ -0,0 +1,54 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.message; + +import java.nio.ByteBuffer; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; + +/** + * Class used to encode message body into buffers. + * <br> + * Used to send large streams over the wire + * + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public interface BodyEncoder +{ + /** + * This method must not be called directly by HornetQ clients. + */ + void open() throws HornetQException; + + /** + * This method must not be called directly by HornetQ clients. + */ + void close() throws HornetQException; + + /** + * This method must not be called directly by HornetQ clients. + */ + int encode(ByteBuffer bufferRead) throws HornetQException; + + /** + * This method must not be called directly by HornetQ clients. + */ + int encode(HornetQBuffer bufferOut, int size) throws HornetQException; + + /** + * This method must not be called directly by HornetQ clients. + */ + long getLargeBodySize(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageImpl.java new file mode 100644 index 0000000..b797f38 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageImpl.java @@ -0,0 +1,1126 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.message.impl; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQPropertyConversionException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.buffers.impl.ResetLimitWrappedHornetQBuffer; +import org.apache.activemq6.core.message.BodyEncoder; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; +import org.apache.activemq6.utils.ByteUtil; +import org.apache.activemq6.utils.DataConstants; +import org.apache.activemq6.utils.TypedProperties; +import org.apache.activemq6.utils.UUID; + +/** + * A concrete implementation of a message + * <p> + * All messages handled by HornetQ core are of this type + * + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version <tt>$Revision: 2740 $</tt> + */ +public abstract class MessageImpl implements MessageInternal +{ + public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO"); + + public static final SimpleString HDR_SCALEDOWN_TO_IDS = new SimpleString("_HQ_SCALEDOWN_TO"); + + public static final SimpleString HDR_ROUTE_TO_ACK_IDS = new SimpleString("_HQ_ACK_ROUTE_TO"); + + // used by the bridges to set duplicates + public static final SimpleString HDR_BRIDGE_DUPLICATE_ID = new SimpleString("_HQ_BRIDGE_DUP"); + + public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; + + public static final int BODY_OFFSET = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT; + + protected long messageID; + + protected SimpleString address; + + protected byte type; + + protected boolean durable; + + /** + * GMT milliseconds at which this message expires. 0 means never expires * + */ + private long expiration; + + protected long timestamp; + + protected TypedProperties properties; + + protected byte priority; + + protected HornetQBuffer buffer; + + protected ResetLimitWrappedHornetQBuffer bodyBuffer; + + protected volatile boolean bufferValid; + + private int endOfBodyPosition = -1; + + private int endOfMessagePosition; + + private boolean copied = true; + + private boolean bufferUsed; + + private UUID userID; + + // Constructors -------------------------------------------------- + + protected MessageImpl() + { + properties = new TypedProperties(); + } + + /** + * overridden by the client message, we need access to the connection so we can create the appropriate HornetQBuffer. + * + * @param type + * @param durable + * @param expiration + * @param timestamp + * @param priority + * @param initialMessageBufferSize + */ + protected MessageImpl(final byte type, + final boolean durable, + final long expiration, + final long timestamp, + final byte priority, + final int initialMessageBufferSize) + { + this(); + this.type = type; + this.durable = durable; + this.expiration = expiration; + this.timestamp = timestamp; + this.priority = priority; + createBody(initialMessageBufferSize); + } + + protected MessageImpl(final int initialMessageBufferSize) + { + this(); + createBody(initialMessageBufferSize); + } + + /* + * Copy constructor + */ + protected MessageImpl(final MessageImpl other) + { + this(other, other.getProperties()); + } + + /* + * Copy constructor + */ + protected MessageImpl(final MessageImpl other, TypedProperties properties) + { + messageID = other.getMessageID(); + userID = other.getUserID(); + address = other.getAddress(); + type = other.getType(); + durable = other.isDurable(); + expiration = other.getExpiration(); + timestamp = other.getTimestamp(); + priority = other.getPriority(); + this.properties = new TypedProperties(properties); + + // This MUST be synchronized using the monitor on the other message to prevent it running concurrently + // with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to + // many subscriptions and bridging to other nodes in a cluster + synchronized (other) + { + bufferValid = other.bufferValid; + endOfBodyPosition = other.endOfBodyPosition; + endOfMessagePosition = other.endOfMessagePosition; + copied = other.copied; + + if (other.buffer != null) + { + other.bufferUsed = true; + + // We need to copy the underlying buffer too, since the different messsages thereafter might have different + // properties set on them, making their encoding different + buffer = other.buffer.copy(0, other.buffer.writerIndex()); + + buffer.setIndex(other.buffer.readerIndex(), buffer.capacity()); + } + } + } + + // Message implementation ---------------------------------------- + + public int getEncodeSize() + { + int headersPropsSize = getHeadersAndPropertiesEncodeSize(); + + int bodyPos = getEndOfBodyPosition(); + + int bodySize = bodyPos - BUFFER_HEADER_SPACE - DataConstants.SIZE_INT; + + return DataConstants.SIZE_INT + bodySize + DataConstants.SIZE_INT + headersPropsSize; + } + + public int getHeadersAndPropertiesEncodeSize() + { + return DataConstants.SIZE_LONG + // Message ID + DataConstants.SIZE_BYTE + // user id null? + (userID == null ? 0 : 16) + + /* address */SimpleString.sizeofNullableString(address) + + DataConstants./* Type */SIZE_BYTE + + DataConstants./* Durable */SIZE_BOOLEAN + + DataConstants./* Expiration */SIZE_LONG + + DataConstants./* Timestamp */SIZE_LONG + + DataConstants./* Priority */SIZE_BYTE + + /* PropertySize and Properties */properties.getEncodeSize(); + } + + + public void encodeHeadersAndProperties(final HornetQBuffer buffer) + { + buffer.writeLong(messageID); + buffer.writeNullableSimpleString(address); + if (userID == null) + { + buffer.writeByte(DataConstants.NULL); + } + else + { + buffer.writeByte(DataConstants.NOT_NULL); + buffer.writeBytes(userID.asBytes()); + } + buffer.writeByte(type); + buffer.writeBoolean(durable); + buffer.writeLong(expiration); + buffer.writeLong(timestamp); + buffer.writeByte(priority); + properties.encode(buffer); + } + + public void decodeHeadersAndProperties(final HornetQBuffer buffer) + { + messageID = buffer.readLong(); + address = buffer.readNullableSimpleString(); + if (buffer.readByte() == DataConstants.NOT_NULL) + { + byte[] bytes = new byte[16]; + buffer.readBytes(bytes); + userID = new UUID(UUID.TYPE_TIME_BASED, bytes); + } + else + { + userID = null; + } + type = buffer.readByte(); + durable = buffer.readBoolean(); + expiration = buffer.readLong(); + timestamp = buffer.readLong(); + priority = buffer.readByte(); + properties.decode(buffer); + } + + public void copyHeadersAndProperties(final MessageInternal msg) + { + messageID = msg.getMessageID(); + address = msg.getAddress(); + userID = msg.getUserID(); + type = msg.getType(); + durable = msg.isDurable(); + expiration = msg.getExpiration(); + timestamp = msg.getTimestamp(); + priority = msg.getPriority(); + properties = msg.getTypedProperties(); + } + + public HornetQBuffer getBodyBuffer() + { + if (bodyBuffer == null) + { + bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this); + } + + return bodyBuffer; + } + + public Message writeBodyBufferBytes(byte[] bytes) + { + getBodyBuffer().writeBytes(bytes); + + return this; + } + + public Message writeBodyBufferString(String string) + { + getBodyBuffer().writeString(string); + + return this; + } + + public void checkCompletion() throws HornetQException + { + // no op on regular messages + } + + + public synchronized HornetQBuffer getBodyBufferCopy() + { + // Must copy buffer before sending it + + HornetQBuffer newBuffer = buffer.copy(0, buffer.capacity()); + + newBuffer.setIndex(0, getEndOfBodyPosition()); + + return new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, newBuffer, null); + } + + public long getMessageID() + { + return messageID; + } + + public UUID getUserID() + { + return userID; + } + + public MessageImpl setUserID(final UUID userID) + { + this.userID = userID; + return this; + } + + /** + * this doesn't need to be synchronized as setAddress is protecting the buffer, + * not the address + */ + public SimpleString getAddress() + { + return address; + } + + /** + * The only reason this is synchronized is because of encoding a message versus invalidating the buffer. + * This synchronization can probably be removed since setAddress is always called from a single thread. + * However I will keep it as it's harmless and it's been well tested + */ + public Message setAddress(final SimpleString address) + { + // This is protecting the buffer + synchronized (this) + { + if (this.address != address) + { + this.address = address; + + bufferValid = false; + } + } + + return this; + } + + public byte getType() + { + return type; + } + + public void setType(byte type) + { + this.type = type; + } + + public boolean isDurable() + { + return durable; + } + + public MessageImpl setDurable(final boolean durable) + { + if (this.durable != durable) + { + this.durable = durable; + + bufferValid = false; + } + return this; + } + + public long getExpiration() + { + return expiration; + } + + public MessageImpl setExpiration(final long expiration) + { + if (this.expiration != expiration) + { + this.expiration = expiration; + + bufferValid = false; + } + return this; + } + + public long getTimestamp() + { + return timestamp; + } + + public MessageImpl setTimestamp(final long timestamp) + { + if (this.timestamp != timestamp) + { + this.timestamp = timestamp; + + bufferValid = false; + } + return this; + } + + public byte getPriority() + { + return priority; + } + + public MessageImpl setPriority(final byte priority) + { + if (this.priority != priority) + { + this.priority = priority; + + bufferValid = false; + } + return this; + } + + public boolean isExpired() + { + if (expiration == 0) + { + return false; + } + + return System.currentTimeMillis() - expiration >= 0; + } + + public Map<String, Object> toMap() + { + Map<String, Object> map = new HashMap<String, Object>(); + + map.put("messageID", messageID); + if (userID != null) + { + map.put("userID", "ID:" + userID.toString()); + } + map.put("address", address.toString()); + map.put("type", type); + map.put("durable", durable); + map.put("expiration", expiration); + map.put("timestamp", timestamp); + map.put("priority", priority); + for (SimpleString propName : properties.getPropertyNames()) + { + map.put(propName.toString(), properties.getProperty(propName)); + } + return map; + } + + public void decodeFromBuffer(final HornetQBuffer buffer) + { + this.buffer = buffer; + + decode(); + } + + public void bodyChanged() + { + // If the body is changed we must copy the buffer otherwise can affect the previously sent message + // which might be in the Netty write queue + checkCopy(); + + bufferValid = false; + + endOfBodyPosition = -1; + } + + public synchronized void checkCopy() + { + if (!copied) + { + forceCopy(); + + copied = true; + } + } + + public synchronized void resetCopied() + { + copied = false; + } + + public int getEndOfMessagePosition() + { + return endOfMessagePosition; + } + + public int getEndOfBodyPosition() + { + if (endOfBodyPosition < 0) + { + endOfBodyPosition = buffer.writerIndex(); + } + return endOfBodyPosition; + } + + // Encode to journal or paging + public void encode(final HornetQBuffer buff) + { + encodeToBuffer(); + + buff.writeBytes(buffer, BUFFER_HEADER_SPACE, endOfMessagePosition - BUFFER_HEADER_SPACE); + } + + // Decode from journal or paging + public void decode(final HornetQBuffer buff) + { + int start = buff.readerIndex(); + + endOfBodyPosition = buff.readInt(); + + endOfMessagePosition = buff.getInt(endOfBodyPosition - BUFFER_HEADER_SPACE + start); + + int length = endOfMessagePosition - BUFFER_HEADER_SPACE; + + buffer.setIndex(0, BUFFER_HEADER_SPACE); + + buffer.writeBytes(buff, start, length); + + decode(); + + buff.readerIndex(start + length); + } + + public synchronized HornetQBuffer getEncodedBuffer() + { + HornetQBuffer buff = encodeToBuffer(); + + if (bufferUsed) + { + HornetQBuffer copied = buff.copy(0, buff.capacity()); + + copied.setIndex(0, endOfMessagePosition); + + return copied; + } + else + { + buffer.setIndex(0, endOfMessagePosition); + + bufferUsed = true; + + return buffer; + } + } + + public void setAddressTransient(final SimpleString address) + { + this.address = address; + } + + + // Properties + // --------------------------------------------------------------------------------------- + + public Message putBooleanProperty(final SimpleString key, final boolean value) + { + properties.putBooleanProperty(key, value); + + bufferValid = false; + + return this; + } + + public Message putByteProperty(final SimpleString key, final byte value) + { + properties.putByteProperty(key, value); + + bufferValid = false; + + return this; + } + + public Message putBytesProperty(final SimpleString key, final byte[] value) + { + properties.putBytesProperty(key, value); + + bufferValid = false; + + return this; + } + + @Override + public Message putCharProperty(SimpleString key, char value) + { + properties.putCharProperty(key, value); + bufferValid = false; + + return this; + } + + @Override + public Message putCharProperty(String key, char value) + { + properties.putCharProperty(new SimpleString(key), value); + bufferValid = false; + + return this; + } + + public Message putShortProperty(final SimpleString key, final short value) + { + properties.putShortProperty(key, value); + bufferValid = false; + + return this; + } + + public Message putIntProperty(final SimpleString key, final int value) + { + properties.putIntProperty(key, value); + bufferValid = false; + + return this; + } + + public Message putLongProperty(final SimpleString key, final long value) + { + properties.putLongProperty(key, value); + bufferValid = false; + + return this; + } + + public Message putFloatProperty(final SimpleString key, final float value) + { + properties.putFloatProperty(key, value); + + bufferValid = false; + + return this; + } + + public Message putDoubleProperty(final SimpleString key, final double value) + { + properties.putDoubleProperty(key, value); + + bufferValid = false; + + return this; + } + + public Message putStringProperty(final SimpleString key, final SimpleString value) + { + properties.putSimpleStringProperty(key, value); + + bufferValid = false; + + return this; + } + + public Message putObjectProperty(final SimpleString key, final Object value) throws HornetQPropertyConversionException + { + TypedProperties.setObjectProperty(key, value, properties); + bufferValid = false; + + return this; + } + + public Message putObjectProperty(final String key, final Object value) throws HornetQPropertyConversionException + { + putObjectProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putBooleanProperty(final String key, final boolean value) + { + properties.putBooleanProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putByteProperty(final String key, final byte value) + { + properties.putByteProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putBytesProperty(final String key, final byte[] value) + { + properties.putBytesProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putShortProperty(final String key, final short value) + { + properties.putShortProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putIntProperty(final String key, final int value) + { + properties.putIntProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putLongProperty(final String key, final long value) + { + properties.putLongProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putFloatProperty(final String key, final float value) + { + properties.putFloatProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putDoubleProperty(final String key, final double value) + { + properties.putDoubleProperty(new SimpleString(key), value); + + bufferValid = false; + + return this; + } + + public Message putStringProperty(final String key, final String value) + { + properties.putSimpleStringProperty(new SimpleString(key), SimpleString.toSimpleString(value)); + + bufferValid = false; + + return this; + } + + public Message putTypedProperties(final TypedProperties otherProps) + { + properties.putTypedProperties(otherProps); + + bufferValid = false; + + return this; + } + + public Object getObjectProperty(final SimpleString key) + { + return properties.getProperty(key); + } + + public Boolean getBooleanProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getBooleanProperty(key); + } + + public Boolean getBooleanProperty(final String key) throws HornetQPropertyConversionException + { + return properties.getBooleanProperty(new SimpleString(key)); + } + + public Byte getByteProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getByteProperty(key); + } + + public Byte getByteProperty(final String key) throws HornetQPropertyConversionException + { + return properties.getByteProperty(new SimpleString(key)); + } + + public byte[] getBytesProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getBytesProperty(key); + } + + public byte[] getBytesProperty(final String key) throws HornetQPropertyConversionException + { + return getBytesProperty(new SimpleString(key)); + } + + public Double getDoubleProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getDoubleProperty(key); + } + + public Double getDoubleProperty(final String key) throws HornetQPropertyConversionException + { + return properties.getDoubleProperty(new SimpleString(key)); + } + + public Integer getIntProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getIntProperty(key); + } + + public Integer getIntProperty(final String key) throws HornetQPropertyConversionException + { + return properties.getIntProperty(new SimpleString(key)); + } + + public Long getLongProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getLongProperty(key); + } + + public Long getLongProperty(final String key) throws HornetQPropertyConversionException + { + return properties.getLongProperty(new SimpleString(key)); + } + + public Short getShortProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getShortProperty(key); + } + + public Short getShortProperty(final String key) throws HornetQPropertyConversionException + { + return properties.getShortProperty(new SimpleString(key)); + } + + public Float getFloatProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getFloatProperty(key); + } + + public Float getFloatProperty(final String key) throws HornetQPropertyConversionException + { + return properties.getFloatProperty(new SimpleString(key)); + } + + public String getStringProperty(final SimpleString key) throws HornetQPropertyConversionException + { + SimpleString str = getSimpleStringProperty(key); + + if (str == null) + { + return null; + } + else + { + return str.toString(); + } + } + + public String getStringProperty(final String key) throws HornetQPropertyConversionException + { + return getStringProperty(new SimpleString(key)); + } + + public SimpleString getSimpleStringProperty(final SimpleString key) throws HornetQPropertyConversionException + { + return properties.getSimpleStringProperty(key); + } + + public SimpleString getSimpleStringProperty(final String key) throws HornetQPropertyConversionException + { + return properties.getSimpleStringProperty(new SimpleString(key)); + } + + public Object getObjectProperty(final String key) + { + return properties.getProperty(new SimpleString(key)); + } + + public Object removeProperty(final SimpleString key) + { + bufferValid = false; + + return properties.removeProperty(key); + } + + public Object removeProperty(final String key) + { + bufferValid = false; + + return properties.removeProperty(new SimpleString(key)); + } + + public boolean containsProperty(final SimpleString key) + { + return properties.containsProperty(key); + } + + public boolean containsProperty(final String key) + { + return properties.containsProperty(new SimpleString(key)); + } + + public Set<SimpleString> getPropertyNames() + { + return properties.getPropertyNames(); + } + + public HornetQBuffer getWholeBuffer() + { + return buffer; + } + + public BodyEncoder getBodyEncoder() throws HornetQException + { + return new DecodingContext(); + } + + public TypedProperties getTypedProperties() + { + return this.properties; + } + + @Override + public boolean equals(Object other) + { + + if (this == other) + { + return true; + } + + if (other instanceof MessageImpl) + { + MessageImpl message = (MessageImpl) other; + + if (this.getMessageID() == message.getMessageID()) + return true; + } + + return false; + } + + /** + * Debug Helper!!!! + * + * I'm leaving this message here without any callers for a reason: + * During debugs it's important eventually to identify what's on the bodies, and this method will give you a good idea about them. + * Add the message.bodyToString() to the Watch variables on the debugger view and this will show up like a charm!!! + * @return + */ + public String bodyToString() + { + getEndOfBodyPosition(); + int readerIndex1 = this.buffer.readerIndex(); + buffer.readerIndex(0); + byte[] buffer1 = new byte[buffer.writerIndex()]; + buffer.readBytes(buffer1); + buffer.readerIndex(readerIndex1); + + byte[] buffer2 = null; + if (bodyBuffer != null) + { + int readerIndex2 = this.bodyBuffer.readerIndex(); + bodyBuffer.readerIndex(0); + buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()]; + bodyBuffer.readBytes(buffer2); + bodyBuffer.readerIndex(readerIndex2); + } + + return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1); + } + + + + + @Override + public int hashCode() + { + return 31 + (int)(messageID ^ (messageID >>> 32)); + } + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + public TypedProperties getProperties() + { + return properties; + } + + // This must be synchronized as it can be called concurrently id the message is being delivered + // concurrently to + // many queues - the first caller in this case will actually encode it + private synchronized HornetQBuffer encodeToBuffer() + { + if (!bufferValid) + { + if (bufferUsed) + { + // Cannot use same buffer - must copy + + forceCopy(); + } + + int bodySize = getEndOfBodyPosition(); + + // Clebert: I've started sending this on encoding due to conversions between protocols + // and making sure we are not losing the buffer start position between protocols + this.endOfBodyPosition = bodySize; + + // write it + buffer.setInt(BUFFER_HEADER_SPACE, bodySize); + + // Position at end of body and skip past the message end position int. + // check for enough room in the buffer even though it is dynamic + if ((bodySize + 4) > buffer.capacity()) + { + buffer.setIndex(0, bodySize); + buffer.writeInt(0); + } + else + { + buffer.setIndex(0, bodySize + DataConstants.SIZE_INT); + } + + encodeHeadersAndProperties(buffer); + + // Write end of message position + + endOfMessagePosition = buffer.writerIndex(); + + buffer.setInt(bodySize, endOfMessagePosition); + + bufferValid = true; + } + + return buffer; + } + + private void decode() + { + endOfBodyPosition = buffer.getInt(BUFFER_HEADER_SPACE); + + buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT); + + decodeHeadersAndProperties(buffer); + + endOfMessagePosition = buffer.readerIndex(); + + bufferValid = true; + } + + public void createBody(final int initialMessageBufferSize) + { + buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize); + + // There's a bug in netty which means a dynamic buffer won't resize until you write a byte + buffer.writeByte((byte) 0); + + buffer.setIndex(BODY_OFFSET, BODY_OFFSET); + } + + private void forceCopy() + { + // Must copy buffer before sending it + + buffer = buffer.copy(0, buffer.capacity()); + + buffer.setIndex(0, getEndOfBodyPosition()); + + if (bodyBuffer != null) + { + bodyBuffer.setBuffer(buffer); + } + + bufferUsed = false; + } + + // Inner classes ------------------------------------------------- + + private final class DecodingContext implements BodyEncoder + { + private int lastPos = 0; + + public DecodingContext() + { + } + + public void open() + { + } + + public void close() + { + } + + public long getLargeBodySize() + { + return buffer.writerIndex(); + } + + public int encode(final ByteBuffer bufferRead) throws HornetQException + { + HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead); + return encode(buffer, bufferRead.capacity()); + } + + public int encode(final HornetQBuffer bufferOut, final int size) + { + bufferOut.writeBytes(getWholeBuffer(), lastPos, size); + lastPos += size; + return size; + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageInternal.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageInternal.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageInternal.java new file mode 100644 index 0000000..b341c25 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/message/impl/MessageInternal.java @@ -0,0 +1,64 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.message.impl; + +import java.io.InputStream; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.message.BodyEncoder; +import org.apache.activemq6.utils.TypedProperties; + +/** + * A MessageInternal + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + * + */ +public interface MessageInternal extends Message +{ + void decodeFromBuffer(HornetQBuffer buffer); + + int getEndOfMessagePosition(); + + int getEndOfBodyPosition(); + + void checkCopy(); + + void bodyChanged(); + + void resetCopied(); + + boolean isServerMessage(); + + HornetQBuffer getEncodedBuffer(); + + int getHeadersAndPropertiesEncodeSize(); + + HornetQBuffer getWholeBuffer(); + + void encodeHeadersAndProperties(HornetQBuffer buffer); + + void decodeHeadersAndProperties(HornetQBuffer buffer); + + BodyEncoder getBodyEncoder() throws HornetQException; + + InputStream getBodyInputStream(); + + void setAddressTransient(SimpleString address); + + TypedProperties getTypedProperties(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/ClientPacketDecoder.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/ClientPacketDecoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/ClientPacketDecoder.java new file mode 100644 index 0000000..0f2f1e7 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/ClientPacketDecoder.java @@ -0,0 +1,71 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol; + +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; +import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.client.impl.ClientLargeMessageImpl; +import org.apache.activemq6.core.client.impl.ClientMessageImpl; +import org.apache.activemq6.core.protocol.core.Packet; +import org.apache.activemq6.core.protocol.core.impl.PacketDecoder; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage; +import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveMessage; +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 10/12/12 + */ +public class ClientPacketDecoder extends PacketDecoder +{ + private static final long serialVersionUID = 6952614096979334582L; + public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder(); + + @Override + public Packet decode(final HornetQBuffer in) + { + final byte packetType = in.readByte(); + + Packet packet = decode(packetType); + + packet.decode(in); + + return packet; + } + + @Override + public Packet decode(byte packetType) + { + Packet packet; + + switch (packetType) + { + case SESS_RECEIVE_MSG: + { + packet = new SessionReceiveMessage(new ClientMessageImpl()); + break; + } + case SESS_RECEIVE_LARGE_MSG: + { + packet = new SessionReceiveClientLargeMessage(new ClientLargeMessageImpl()); + break; + } + default: + { + packet = super.decode(packetType); + } + } + return packet; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Channel.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Channel.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Channel.java new file mode 100644 index 0000000..4c82ef2 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Channel.java @@ -0,0 +1,202 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core; + +import java.util.concurrent.locks.Lock; + +import org.apache.activemq6.api.core.HornetQException; + +/** + * A channel is a way of interleaving data meant for different endpoints over the same {@link org.apache.activemq6.core.protocol.core.CoreRemotingConnection}. + * <p> + * Any packet sent will have its channel id set to the specific channel sending so it can be routed to its correct channel + * when received by the {@link org.apache.activemq6.core.protocol.core.CoreRemotingConnection}. see {@link org.hornetq.core.protocol.core.Packet#setChannelID(long)}. + * <p> + * Each Channel should will forward any packets received to its {@link org.apache.activemq6.core.protocol.core.ChannelHandler}. + * <p> + * A Channel *does not* support concurrent access by more than one thread! + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface Channel +{ + /** + * Returns the id of this channel. + * @return the id + */ + long getID(); + + /** For protocol check */ + boolean supports(byte packetID); + + /** + * Sends a packet on this channel. + * @param packet the packet to send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean send(Packet packet); + + /** + * Sends a packet on this channel using batching algorithm if appropriate + * @param packet the packet to send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean sendBatched(Packet packet); + + /** + * Sends a packet on this channel and then blocks until it has been written to the connection. + * @param packet the packet to send + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean sendAndFlush(Packet packet); + + /** + * Sends a packet on this channel and then blocks until a response is received or a timeout + * occurs. + * @param packet the packet to send + * @param expectedPacket the packet being expected. + * @return the response + * @throws HornetQException if an error occurs during the send + */ + Packet sendBlocking(Packet packet, byte expectedPacket) throws HornetQException; + + /** + * Sets the {@link org.apache.activemq6.core.protocol.core.ChannelHandler} that this channel should + * forward received packets to. + * @param handler the handler + */ + void setHandler(ChannelHandler handler); + + /** + * Gets the {@link org.apache.activemq6.core.protocol.core.ChannelHandler} that this channel should + * forward received packets to. + * @return the current channel handler + */ + ChannelHandler getHandler(); + + /** + * Closes this channel. + * <p> + * once closed no packets can be sent. + */ + void close(); + + /** + * Transfers the connection used by this channel to the one specified. + * <p> + * All new packets will be sent via this connection. + * @param newConnection the new connection + */ + void transferConnection(CoreRemotingConnection newConnection); + + /** + * resends any packets that have not received confirmations yet. + * <p> + * Typically called after a connection has been transferred. + * + * @param lastConfirmedCommandID the last confirmed packet + */ + void replayCommands(int lastConfirmedCommandID); + + /** + * returns the last confirmed packet command id + * + * @return the id + */ + int getLastConfirmedCommandID(); + + /** + * locks the channel. + * <p> + * While locked no packets can be sent or received + */ + void lock(); + + /** + * unlocks the channel. + */ + void unlock(); + + /** + * forces any {@link org.apache.activemq6.core.protocol.core.Channel#sendBlocking(Packet, byte)} request to return with an exception. + */ + void returnBlocking(); + + /** + * forces any {@link org.apache.activemq6.core.protocol.core.Channel#sendBlocking(Packet, byte)} request to return with an exception. + */ + void returnBlocking(Throwable cause); + + /** + * returns the channel lock + * + * @return the lock + */ + Lock getLock(); + + /** + * returns the {@link CoreRemotingConnection} being used by the channel + */ + CoreRemotingConnection getConnection(); + + /** + * sends a confirmation of a packet being received. + * + * @param packet the packet to confirm + */ + void confirm(Packet packet); + + /** + * sets the handler to use when a confirmation is received. + * + * @param handler the handler to call + */ + void setCommandConfirmationHandler(CommandConfirmationHandler handler); + + /** + * flushes any confirmations on to the connection. + */ + void flushConfirmations(); + + /** + * Called by {@link org.apache.activemq6.core.protocol.core.CoreRemotingConnection} when a packet is received. + * <p> + * This method should then call its {@link org.apache.activemq6.core.protocol.core.ChannelHandler} after appropriate processing of + * the packet + * + * @param packet the packet to process. + */ + void handlePacket(Packet packet); + + /** + * clears any commands from the cache that are yet to be confirmed. + */ + void clearCommands(); + + /** + * returns the confirmation window size this channel is using. + * + * @return the window size + */ + int getConfirmationWindowSize(); + + /** + * notifies the channel if it is transferring its connection. When true it is illegal to send messages. + * + * @param transferring whether the channel is transferring + */ + void setTransferring(boolean transferring); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/ChannelHandler.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/ChannelHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/ChannelHandler.java new file mode 100644 index 0000000..0fce6da --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/ChannelHandler.java @@ -0,0 +1,30 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core; + + +/** + * A ChannelHandler is used by {@link Channel}. When a channel receives a packet it will call its handler to deal with the + * packet. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface ChannelHandler +{ + /** + * called by the channel when a packet is received.. + * + * @param packet the packet received + */ + void handlePacket(Packet packet); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CommandConfirmationHandler.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CommandConfirmationHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CommandConfirmationHandler.java new file mode 100644 index 0000000..fe7fda9 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CommandConfirmationHandler.java @@ -0,0 +1,31 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core; + + +/** + * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets. + * <p> + * Created 9 Feb 2009 12:39:11 + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface CommandConfirmationHandler +{ + /** + * called by channel after a confirmation has been received. + * + * @param packet the packet confirmed + */ + void commandConfirmed(Packet packet); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CoreRemotingConnection.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CoreRemotingConnection.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CoreRemotingConnection.java new file mode 100644 index 0000000..513aa70 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/CoreRemotingConnection.java @@ -0,0 +1,104 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core; + +import org.apache.activemq6.core.security.HornetQPrincipal; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + + +/** + * Extension of RemotingConnection for the HornetQ core protocol + * @author Tim Fox + */ +public interface CoreRemotingConnection extends RemotingConnection +{ + + /** The client protocol used on the communication. + * This will determine if the client has support for certain packet types */ + int getClientVersion(); + + /** + * Sets the client protocol used on the communication. This will determine if the client has + * support for certain packet types + */ + void setClientVersion(int clientVersion); + + /** + * Returns the channel with the channel id specified. + * <p> + * If it does not exist create it with the confirmation window size. + * @param channelID the channel id + * @param confWindowSize the confirmation window size + * @return the channel + */ + Channel getChannel(long channelID, int confWindowSize); + + /** + * add the channel with the specified channel id + * + * @param channelID the channel id + * @param channel the channel + */ + void putChannel(long channelID, Channel channel); + + /** + * remove the channel with the specified channel id + * + * @param channelID the channel id + * @return true if removed + */ + boolean removeChannel(long channelID); + + /** + * generate a unique (within this connection) channel id + * + * @return the id + */ + long generateChannelID(); + + /** + * Resets the id generator used to generate id's. + * @param id the first id to set it to + */ + void syncIDGeneratorSequence(long id); + + /** + * Returns the next id to be chosen. + * @return the id + */ + long getIDGeneratorSequence(); + + /** + * Returns the current timeout for blocking calls + * @return the timeout in milliseconds + */ + long getBlockingCallTimeout(); + + /** + * Returns the current timeout for blocking calls + * @return the timeout in milliseconds + */ + long getBlockingCallFailoverTimeout(); + + /** + * Returns the transfer lock used when transferring connections. + * @return the lock + */ + Object getTransferLock(); + + /** + * Returns the default security principal + * @return the principal + */ + HornetQPrincipal getDefaultHornetQPrincipal(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Packet.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Packet.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Packet.java new file mode 100644 index 0000000..6fb6359 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/Packet.java @@ -0,0 +1,86 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + +/** + * A Packet represents a packet of data transmitted over a connection. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface Packet +{ + /** + * Sets the channel id that should be used once the packet has been successfully decoded it is + * sent to the correct channel. + * + * @param channelID the id of the channel to handle the packet + */ + void setChannelID(long channelID); + + /** + * Returns the channel id of the channel that should handle this packet. + * + * @return the id of the channel + */ + long getChannelID(); + + /** + * returns true if this packet is being sent in response to a previously received packet + * + * @return true if a response + */ + boolean isResponse(); + + /** + * returns the type of the packet. + * <p> + * This is needed when decoding the packet + * + * @return the packet type + */ + byte getType(); + + /** + * Encodes the packet and returns a {@link org.apache.activemq6.api.core.HornetQBuffer} containing the data + * + * @param connection the connection + * @return the buffer to encode to + */ + HornetQBuffer encode(RemotingConnection connection); + + /** + * decodes the buffer into this packet + * + * @param buffer the buffer to decode from + */ + void decode(HornetQBuffer buffer); + + /** + * returns the size needed to encode this packet. + * + * @return The size of the entire packet including headers, and extra data + */ + int getPacketSize(); + + /** + * returns true if a confirmation should be sent on receipt of this packet. + * + * @return true if confirmation is required + */ + boolean isRequiresConfirmations(); + + boolean isAsyncExec(); +}
