http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java new file mode 100644 index 0000000..6dd712c --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAResumeMessage.java @@ -0,0 +1,88 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import javax.transaction.xa.Xid; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; +import org.apache.activemq6.utils.XidCodecSupport; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class SessionXAResumeMessage extends PacketImpl +{ + + private Xid xid; + + public SessionXAResumeMessage(final Xid xid) + { + super(SESS_XA_RESUME); + + this.xid = xid; + } + + public SessionXAResumeMessage() + { + super(SESS_XA_RESUME); + } + + // Public -------------------------------------------------------- + + public Xid getXid() + { + return xid; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + XidCodecSupport.encodeXid(xid, buffer); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + xid = XidCodecSupport.decodeXid(buffer); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((xid == null) ? 0 : xid.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionXAResumeMessage)) + return false; + SessionXAResumeMessage other = (SessionXAResumeMessage)obj; + if (xid == null) + { + if (other.xid != null) + return false; + } + else if (!xid.equals(other.xid)) + return false; + return true; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java new file mode 100644 index 0000000..fc0679a --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java @@ -0,0 +1,95 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import javax.transaction.xa.Xid; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; +import org.apache.activemq6.utils.XidCodecSupport; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class SessionXARollbackMessage extends PacketImpl +{ + private Xid xid; + + public SessionXARollbackMessage(final Xid xid) + { + super(SESS_XA_ROLLBACK); + + this.xid = xid; + } + + public SessionXARollbackMessage() + { + super(SESS_XA_ROLLBACK); + } + + // Public -------------------------------------------------------- + + public Xid getXid() + { + return xid; + } + + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + XidCodecSupport.encodeXid(xid, buffer); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + xid = XidCodecSupport.decodeXid(buffer); + } + + @Override + public boolean isAsyncExec() + { + return true; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((xid == null) ? 0 : xid.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionXARollbackMessage)) + return false; + SessionXARollbackMessage other = (SessionXARollbackMessage)obj; + if (xid == null) + { + if (other.xid != null) + return false; + } + else if (!xid.equals(other.xid)) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java new file mode 100644 index 0000000..94b5f28 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutMessage.java @@ -0,0 +1,80 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class SessionXASetTimeoutMessage extends PacketImpl +{ + private int timeoutSeconds; + + public SessionXASetTimeoutMessage(final int timeoutSeconds) + { + super(SESS_XA_SET_TIMEOUT); + + this.timeoutSeconds = timeoutSeconds; + } + + public SessionXASetTimeoutMessage() + { + super(SESS_XA_SET_TIMEOUT); + } + + // Public -------------------------------------------------------- + + public int getTimeoutSeconds() + { + return timeoutSeconds; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeInt(timeoutSeconds); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + timeoutSeconds = buffer.readInt(); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + timeoutSeconds; + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionXASetTimeoutMessage)) + return false; + SessionXASetTimeoutMessage other = (SessionXASetTimeoutMessage)obj; + if (timeoutSeconds != other.timeoutSeconds) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java new file mode 100644 index 0000000..4ac036e --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXASetTimeoutResponseMessage.java @@ -0,0 +1,86 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class SessionXASetTimeoutResponseMessage extends PacketImpl +{ + private boolean ok; + + public SessionXASetTimeoutResponseMessage(final boolean ok) + { + super(SESS_XA_SET_TIMEOUT_RESP); + + this.ok = ok; + } + + public SessionXASetTimeoutResponseMessage() + { + super(SESS_XA_SET_TIMEOUT_RESP); + } + + // Public -------------------------------------------------------- + + @Override + public boolean isResponse() + { + return true; + } + + public boolean isOK() + { + return ok; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeBoolean(ok); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + ok = buffer.readBoolean(); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (ok ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionXASetTimeoutResponseMessage)) + return false; + SessionXASetTimeoutResponseMessage other = (SessionXASetTimeoutResponseMessage)obj; + if (ok != other.ok) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAStartMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAStartMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAStartMessage.java new file mode 100644 index 0000000..ad2a73b --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SessionXAStartMessage.java @@ -0,0 +1,98 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import javax.transaction.xa.Xid; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; +import org.apache.activemq6.utils.XidCodecSupport; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class SessionXAStartMessage extends PacketImpl +{ + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private Xid xid; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public SessionXAStartMessage(final Xid xid) + { + super(SESS_XA_START); + + this.xid = xid; + } + + public SessionXAStartMessage() + { + super(SESS_XA_START); + } + + // Public -------------------------------------------------------- + + public Xid getXid() + { + return xid; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + XidCodecSupport.encodeXid(xid, buffer); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + xid = XidCodecSupport.decodeXid(buffer); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((xid == null) ? 0 : xid.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SessionXAStartMessage)) + return false; + SessionXAStartMessage other = (SessionXAStartMessage)obj; + if (xid == null) + { + if (other.xid != null) + return false; + } + else if (!xid.equals(other.xid)) + return false; + return true; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java new file mode 100644 index 0000000..d838779 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessage.java @@ -0,0 +1,102 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.core.protocol.core.impl.PacketImpl; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class SubscribeClusterTopologyUpdatesMessage extends PacketImpl +{ + + private boolean clusterConnection; + + public SubscribeClusterTopologyUpdatesMessage(final boolean clusterConnection) + { + super(SUBSCRIBE_TOPOLOGY); + + this.clusterConnection = clusterConnection; + } + + protected SubscribeClusterTopologyUpdatesMessage(byte packetType, final boolean clusterConnection) + { + super(packetType); + + this.clusterConnection = clusterConnection; + } + + public SubscribeClusterTopologyUpdatesMessage() + { + super(SUBSCRIBE_TOPOLOGY); + } + + protected SubscribeClusterTopologyUpdatesMessage(byte packetType) + { + super(packetType); + } + + // Public -------------------------------------------------------- + + public boolean isClusterConnection() + { + return clusterConnection; + } + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + buffer.writeBoolean(clusterConnection); + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + clusterConnection = buffer.readBoolean(); + } + + @Override + public String toString() + { + return "SubscribeClusterTopologyUpdatesMessage [clusterConnection=" + clusterConnection + + ", toString()=" + + super.toString() + + "]"; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (clusterConnection ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SubscribeClusterTopologyUpdatesMessage)) + return false; + SubscribeClusterTopologyUpdatesMessage other = (SubscribeClusterTopologyUpdatesMessage)obj; + if (clusterConnection != other.clusterConnection) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java new file mode 100644 index 0000000..a129ce0 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/SubscribeClusterTopologyUpdatesMessageV2.java @@ -0,0 +1,87 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.core.impl.wireformat; + +import org.apache.activemq6.api.core.HornetQBuffer; + +/** + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class SubscribeClusterTopologyUpdatesMessageV2 extends SubscribeClusterTopologyUpdatesMessage +{ + + private int clientVersion; + + public SubscribeClusterTopologyUpdatesMessageV2(final boolean clusterConnection, int clientVersion) + { + super(SUBSCRIBE_TOPOLOGY_V2, clusterConnection); + + this.clientVersion = clientVersion; + } + + public SubscribeClusterTopologyUpdatesMessageV2() + { + super(SUBSCRIBE_TOPOLOGY_V2); + } + + // Public -------------------------------------------------------- + + + + @Override + public void encodeRest(final HornetQBuffer buffer) + { + super.encodeRest(buffer); + buffer.writeInt(clientVersion); + } + + /** + * @return the clientVersion + */ + public int getClientVersion() + { + return clientVersion; + } + + @Override + public void decodeRest(final HornetQBuffer buffer) + { + super.decodeRest(buffer); + clientVersion = buffer.readInt(); + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + clientVersion; + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (!(obj instanceof SubscribeClusterTopologyUpdatesMessageV2)) + return false; + SubscribeClusterTopologyUpdatesMessageV2 other = (SubscribeClusterTopologyUpdatesMessageV2)obj; + if (clientVersion != other.clientVersion) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/CloseListener.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/CloseListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/CloseListener.java new file mode 100644 index 0000000..ea74b0e --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/CloseListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting; + +/** + * CloseListeners can be registered with a {@link org.apache.activemq6.spi.core.protocol.RemotingConnection} to get notified when the connection is closed. + * <p> + * {@link org.apache.activemq6.spi.core.protocol.RemotingConnection#addCloseListener(CloseListener)} + * + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public interface CloseListener +{ + /** + * called when the connection is closed + */ + void connectionClosed(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/FailureListener.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/FailureListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/FailureListener.java new file mode 100644 index 0000000..7274f14 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/FailureListener.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting; + +import org.apache.activemq6.api.core.HornetQException; + +/** + * A FailureListener notifies the user when a connection failure occurred. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public interface FailureListener +{ + /** + * Notifies that a connection has failed due to the specified exception. + * + * @param exception exception which has caused the connection to fail + * @param failedOver + */ + void connectionFailed(HornetQException exception, boolean failedOver); + + /** + * Notifies that a connection has failed due to the specified exception. + * + * @param exception exception which has caused the connection to fail + * @param failedOver + * @param scaleDownTargetNodeID the ID of the node to which messages are scaling down + */ + void connectionFailed(HornetQException exception, boolean failedOver, String scaleDownTargetNodeID); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/TransportConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/TransportConfigurationUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/TransportConfigurationUtil.java new file mode 100644 index 0000000..66e1e27 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/TransportConfigurationUtil.java @@ -0,0 +1,92 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.remoting.impl; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq6.api.core.TransportConfigurationHelper; +import org.apache.activemq6.utils.ClassloadingUtil; + +/** + * Stores static mappings of class names to ConnectorFactory instances to act as a central repo for ConnectorFactory + * objects. + * + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + */ + +public class TransportConfigurationUtil +{ + private static final Map<String, Map<String, Object>> DEFAULTS = new HashMap<>(); + + private static final HashMap<String, Object> EMPTY_HELPER = new HashMap<>(); + + public static Map<String, Object> getDefaults(String className) + { + if (className == null) + { + /* Returns a new clone of the empty helper. This allows any parent objects to update the map key/values + without polluting the EMPTY_HELPER map. */ + return (Map<String, Object>) EMPTY_HELPER.clone(); + } + + if (!DEFAULTS.containsKey(className)) + { + Object object = instantiateObject(className); + if (object != null && object instanceof TransportConfigurationHelper) + { + + DEFAULTS.put(className, ((TransportConfigurationHelper) object).getDefaults()); + } + else + { + DEFAULTS.put(className, EMPTY_HELPER); + } + } + + /* We need to return a copy of the default Map. This means the defaults parent is able to update the map without + modifying the original */ + return cloneDefaults(DEFAULTS.get(className)); + } + + private static Object instantiateObject(final String className) + { + return AccessController.doPrivileged(new PrivilegedAction<Object>() + { + public Object run() + { + try + { + return ClassloadingUtil.newInstanceFromClassLoader(className); + } + catch (IllegalStateException e) + { + return null; + } + } + }); + } + + private static Map<String, Object> cloneDefaults(Map<String, Object> defaults) + { + Map<String, Object> cloned = new HashMap<String, Object>(); + for (Map.Entry entry : defaults.entrySet()) + { + cloned.put((String) entry.getKey(), entry.getValue()); + } + return cloned; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java new file mode 100644 index 0000000..7890953 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQAMQPFrameDecoder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +/** + * A Netty decoder specially optimised to to decode messages on the core protocol only + * + * @author <a href="[email protected]">Trustin Lee</a> + * @author <a href="[email protected]">Norman Maurer</a> + * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, ê¸) $ + */ +public class HornetQAMQPFrameDecoder extends LengthFieldBasedFrameDecoder +{ + public HornetQAMQPFrameDecoder() + { + // The interface itself is part of the buffer (hence the -4) + super(Integer.MAX_VALUE, 0, 4, -4 , 0); + } + + + @Override + protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) + { + return super.extractFrame(ctx, buffer, index, length); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQChannelHandler.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQChannelHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQChannelHandler.java new file mode 100644 index 0000000..5cbcb14 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQChannelHandler.java @@ -0,0 +1,122 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.group.ChannelGroup; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.spi.core.remoting.BufferHandler; +import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener; + + +/** + * Common handler implementation for client and server side handler. + * + * @author <a href="mailto:[email protected]">Trustin Lee</a> + * @author <a href="mailto:[email protected]">Norman Maurer</a> + * @version $Rev$, $Date$ + */ +public class HornetQChannelHandler extends ChannelDuplexHandler +{ + private final ChannelGroup group; + + private final BufferHandler handler; + + private final ConnectionLifeCycleListener listener; + + volatile boolean active; + + protected HornetQChannelHandler(final ChannelGroup group, + final BufferHandler handler, + final ConnectionLifeCycleListener listener) + { + this.group = group; + this.handler = handler; + this.listener = listener; + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception + { + group.add(ctx.channel()); + ctx.fireChannelActive(); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception + { + // TODO: Think about the id thingy + listener.connectionReadyForWrites(channelId(ctx.channel()), ctx.channel().isWritable()); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception + { + ByteBuf buffer = (ByteBuf) msg; + + handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer)); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception + { + synchronized (this) + { + if (active) + { + listener.connectionDestroyed(channelId(ctx.channel())); + + active = false; + } + } + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception + { + if (!active) + { + return; + } + // We don't want to log this - since it is normal for this to happen during failover/reconnect + // and we don't want to spew out stack traces in that event + // The user has access to this exeception anyway via the HornetQException initial cause + + HornetQException me = HornetQClientMessageBundle.BUNDLE.nettyError(); + me.initCause(cause); + + synchronized (listener) + { + try + { + listener.connectionException(channelId(ctx.channel()), me); + active = false; + } + catch (Exception ex) + { + HornetQClientLogger.LOGGER.errorCallingLifeCycleListener(ex); + } + } + } + + protected static int channelId(Channel channel) + { + return channel.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQFrameDecoder2.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQFrameDecoder2.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQFrameDecoder2.java new file mode 100644 index 0000000..31f9c80 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/HornetQFrameDecoder2.java @@ -0,0 +1,39 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.apache.activemq6.utils.DataConstants; + +/** + * A Netty decoder specially optimised to to decode messages on the core protocol only + * + * @author <a href="[email protected]">Trustin Lee</a> + * @author <a href="[email protected]">Norman Maurer</a> + * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, ê¸) $ + */ +public class HornetQFrameDecoder2 extends LengthFieldBasedFrameDecoder +{ + public HornetQFrameDecoder2() + { + super(Integer.MAX_VALUE, 0, DataConstants.SIZE_INT); + } + + @Override + protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) + { + return super.extractFrame(ctx, buffer, index, length).skipBytes(DataConstants.SIZE_INT); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnection.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnection.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnection.java new file mode 100644 index 0000000..015a864 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/remoting/impl/netty/NettyConnection.java @@ -0,0 +1,448 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.remoting.impl.netty; + +import java.net.SocketAddress; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Semaphore; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.handler.ssl.SslHandler; +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQBuffers; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.security.HornetQPrincipal; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.apache.activemq6.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq6.spi.core.remoting.ReadyListener; +import org.apache.activemq6.utils.ConcurrentHashSet; + +/** + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="[email protected]">Norman Maurer</a> + */ +public class NettyConnection implements Connection +{ + // Constants ----------------------------------------------------- + private static final int BATCHING_BUFFER_SIZE = 8192; + + // Attributes ---------------------------------------------------- + + protected final Channel channel; + + private boolean closed; + + private final ConnectionLifeCycleListener listener; + + private final boolean batchingEnabled; + + private final boolean directDeliver; + + private volatile HornetQBuffer batchBuffer; + + private final Map<String, Object> configuration; + + private final Semaphore writeLock = new Semaphore(1); + + private final Set<ReadyListener> readyListeners = new ConcurrentHashSet<ReadyListener>(); + + private RemotingConnection protocolConnection; + +// Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public NettyConnection(final Map<String, Object> configuration, + final Channel channel, + final ConnectionLifeCycleListener listener, + boolean batchingEnabled, + boolean directDeliver) + { + this.configuration = configuration; + + this.channel = channel; + + this.listener = listener; + + this.batchingEnabled = batchingEnabled; + + this.directDeliver = directDeliver; + } + + // Public -------------------------------------------------------- + + public Channel getNettyChannel() + { + return channel; + } + // Connection implementation ---------------------------- + + + public void forceClose() + { + if (channel != null) + { + try + { + channel.close(); + } + catch (Throwable e) + { + HornetQClientLogger.LOGGER.warn(e.getMessage(), e); + } + } + } + + + /** + * This is exposed so users would have the option to look at any data through interceptors + * + * @return + */ + public Channel getChannel() + { + return channel; + } + + public RemotingConnection getProtocolConnection() + { + return protocolConnection; + } + + public void setProtocolConnection(RemotingConnection protocolConnection) + { + this.protocolConnection = protocolConnection; + } + + public void close() + { + if (closed) + { + return; + } + + final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl"); + EventLoop eventLoop = channel.eventLoop(); + boolean inEventLoop = eventLoop.inEventLoop(); + //if we are in an event loop we need to close the channel after the writes have finished + if (!inEventLoop) + { + closeSSLAndChannel(sslHandler, channel); + } + else + { + eventLoop.execute(new Runnable() + { + @Override + public void run() + { + closeSSLAndChannel(sslHandler, channel); + } + }); + } + + closed = true; + + listener.connectionDestroyed(getID()); + } + + public HornetQBuffer createBuffer(final int size) + { + return new ChannelBufferWrapper(channel.alloc().buffer(size)); + } + + public Object getID() + { + // TODO: Think of it + return channel.hashCode(); + } + + // This is called periodically to flush the batch buffer + public void checkFlushBatchBuffer() + { + if (!batchingEnabled) + { + return; + } + + if (writeLock.tryAcquire()) + { + try + { + if (batchBuffer != null && batchBuffer.readable()) + { + channel.writeAndFlush(batchBuffer.byteBuf()); + + batchBuffer = createBuffer(BATCHING_BUFFER_SIZE); + } + } + finally + { + writeLock.release(); + } + } + } + + public void write(final HornetQBuffer buffer) + { + write(buffer, false, false); + } + + public void write(HornetQBuffer buffer, final boolean flush, final boolean batched) + { + write(buffer, flush, batched, null); + } + + public void write(HornetQBuffer buffer, final boolean flush, final boolean batched, final ChannelFutureListener futureListener) + { + + try + { + writeLock.acquire(); + + try + { + if (batchBuffer == null && batchingEnabled && batched && !flush) + { + // Lazily create batch buffer + + batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE); + } + + if (batchBuffer != null) + { + batchBuffer.writeBytes(buffer, 0, buffer.writerIndex()); + + if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched || flush) + { + // If the batch buffer is full or it's flush param or not batched then flush the buffer + + buffer = batchBuffer; + } + else + { + return; + } + + if (!batched || flush) + { + batchBuffer = null; + } + else + { + // Create a new buffer + + batchBuffer = HornetQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE); + } + } + + // depending on if we need to flush or not we can use a voidPromise or + // use a normal promise + final ByteBuf buf = buffer.byteBuf(); + final ChannelPromise promise; + if (flush || futureListener != null) + { + promise = channel.newPromise(); + } + else + { + promise = channel.voidPromise(); + } + + EventLoop eventLoop = channel.eventLoop(); + boolean inEventLoop = eventLoop.inEventLoop(); + if (!inEventLoop) + { + if (futureListener != null) + { + channel.writeAndFlush(buf, promise).addListener(futureListener); + } + else + { + channel.writeAndFlush(buf, promise); + } + } + else + { + // create a task which will be picked up by the eventloop and trigger the write. + // This is mainly needed as this method is triggered by different threads for the same channel. + // if we not do this we may produce out of order writes. + final Runnable task = new Runnable() + { + @Override + public void run() + { + if (futureListener != null) + { + channel.writeAndFlush(buf, promise).addListener(futureListener); + } + else + { + channel.writeAndFlush(buf, promise); + } + } + }; + // execute the task on the eventloop + eventLoop.execute(task); + } + + + // only try to wait if not in the eventloop otherwise we will produce a deadlock + if (flush && !inEventLoop) + { + while (true) + { + try + { + boolean ok = promise.await(10000); + + if (!ok) + { + HornetQClientLogger.LOGGER.timeoutFlushingPacket(); + } + + break; + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + } + } + finally + { + writeLock.release(); + } + } + catch (InterruptedException e) + { + throw new HornetQInterruptedException(e); + } + } + + public String getRemoteAddress() + { + SocketAddress address = channel.remoteAddress(); + if (address == null) + { + return null; + } + return address.toString(); + } + + public boolean isDirectDeliver() + { + return directDeliver; + } + + public void addReadyListener(final ReadyListener listener) + { + readyListeners.add(listener); + } + + public void removeReadyListener(final ReadyListener listener) + { + readyListeners.remove(listener); + } + + //never allow this + public HornetQPrincipal getDefaultHornetQPrincipal() + { + return null; + } + + void fireReady(final boolean ready) + { + for (ReadyListener listener : readyListeners) + { + listener.readyForWriting(ready); + } + } + + + @Override + public TransportConfiguration getConnectorConfig() + { + if (configuration != null) + { + return new TransportConfiguration(NettyConnectorFactory.class.getName(), this.configuration); + } + else + { + return null; + } + } + + @Override + public boolean isUsingProtocolHandling() + { + return true; + } + + + // Public -------------------------------------------------------- + + @Override + public String toString() + { + return super.toString() + "[local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]"; + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + + private void closeSSLAndChannel(SslHandler sslHandler, Channel channel) + { + if (sslHandler != null) + { + try + { + ChannelFuture sslCloseFuture = sslHandler.close(); + + if (!sslCloseFuture.awaitUninterruptibly(10000)) + { + HornetQClientLogger.LOGGER.timeoutClosingSSL(); + } + } + catch (Throwable t) + { + // ignore + } + } + + ChannelFuture closeFuture = channel.close(); + if (!closeFuture.awaitUninterruptibly(10000)) + { + HornetQClientLogger.LOGGER.timeoutClosingNettyChannel(); + } + } + // Inner classes ------------------------------------------------- + +}
