http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/Role.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/Role.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/Role.java new file mode 100644 index 0000000..30348cb --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/security/Role.java @@ -0,0 +1,199 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.security; + +import java.io.Serializable; + +/** + * A role is used by the security store to define access rights and is configured on a connection factory or an address. + * + * @author <a href="[email protected]">Andy Taylor</a> + */ +public class Role implements Serializable +{ + private static final long serialVersionUID = 3560097227776448872L; + + private final String name; + + private final boolean send; + + private final boolean consume; + + private final boolean createDurableQueue; + + private final boolean deleteDurableQueue; + + private final boolean createNonDurableQueue; + + private final boolean deleteNonDurableQueue; + + private final boolean manage; + + public Role(final String name, + final boolean send, + final boolean consume, + final boolean createDurableQueue, + final boolean deleteDurableQueue, + final boolean createNonDurableQueue, + final boolean deleteNonDurableQueue, + final boolean manage) + { + if (name == null) + { + throw new NullPointerException("name is null"); + } + this.name = name; + this.send = send; + this.consume = consume; + this.createDurableQueue = createDurableQueue; + this.deleteDurableQueue = deleteDurableQueue; + this.createNonDurableQueue = createNonDurableQueue; + this.deleteNonDurableQueue = deleteNonDurableQueue; + this.manage = manage; + } + + public String getName() + { + return name; + } + + public boolean isSend() + { + return send; + } + + public boolean isConsume() + { + return consume; + } + + public boolean isCreateDurableQueue() + { + return createDurableQueue; + } + + public boolean isDeleteDurableQueue() + { + return deleteDurableQueue; + } + + public boolean isCreateNonDurableQueue() + { + return createNonDurableQueue; + } + + public boolean isDeleteNonDurableQueue() + { + return deleteNonDurableQueue; + } + + @Override + public String toString() + { + StringBuffer stringReturn = new StringBuffer("Role {name=" + name + "; allows=["); + + if (send) + { + stringReturn.append(" send "); + } + if (consume) + { + stringReturn.append(" consume "); + } + if (createDurableQueue) + { + stringReturn.append(" createDurableQueue "); + } + if (deleteDurableQueue) + { + stringReturn.append(" deleteDurableQueue "); + } + if (createNonDurableQueue) + { + stringReturn.append(" createNonDurableQueue "); + } + if (deleteNonDurableQueue) + { + stringReturn.append(" deleteNonDurableQueue "); + } + + stringReturn.append("]}"); + + return stringReturn.toString(); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + Role role = (Role)o; + + if (consume != role.consume) + { + return false; + } + if (createDurableQueue != role.createDurableQueue) + { + return false; + } + if (createNonDurableQueue != role.createNonDurableQueue) + { + return false; + } + if (deleteDurableQueue != role.deleteDurableQueue) + { + return false; + } + if (deleteNonDurableQueue != role.deleteNonDurableQueue) + { + return false; + } + if (send != role.send) + { + return false; + } + if (!name.equals(role.name)) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result; + result = name.hashCode(); + result = 31 * result + (send ? 1 : 0); + result = 31 * result + (consume ? 1 : 0); + result = 31 * result + (createDurableQueue ? 1 : 0); + result = 31 * result + (deleteDurableQueue ? 1 : 0); + result = 31 * result + (createNonDurableQueue ? 1 : 0); + result = 31 * result + (deleteNonDurableQueue ? 1 : 0); + return result; + } + + public boolean isManage() + { + return manage; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/QueueQueryResult.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/QueueQueryResult.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/QueueQueryResult.java new file mode 100644 index 0000000..9fba0b8 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/QueueQueryResult.java @@ -0,0 +1,124 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.server; + +import org.apache.activemq6.api.core.SimpleString; + +/** + * + * A QueueQueryResult + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public class QueueQueryResult +{ + private SimpleString name; + + private boolean exists; + + private boolean durable; + + private int consumerCount; + + private long messageCount; + + private SimpleString filterString; + + private SimpleString address; + + private boolean temporary; + + public QueueQueryResult(final SimpleString name, + final SimpleString address, + final boolean durable, + final boolean temporary, + final SimpleString filterString, + final int consumerCount, + final long messageCount) + { + this(name, address, durable, temporary, filterString, consumerCount, messageCount, true); + } + + public QueueQueryResult() + { + this(null, null, false, false, null, 0, 0, false); + } + + private QueueQueryResult(final SimpleString name, + final SimpleString address, + final boolean durable, + final boolean temporary, + final SimpleString filterString, + final int consumerCount, + final long messageCount, + final boolean exists) + { + this.durable = durable; + + this.temporary = temporary; + + this.consumerCount = consumerCount; + + this.messageCount = messageCount; + + this.filterString = filterString; + + this.address = address; + + this.name = name; + + this.exists = exists; + } + + public boolean isExists() + { + return exists; + } + + public boolean isDurable() + { + return durable; + } + + public int getConsumerCount() + { + return consumerCount; + } + + public long getMessageCount() + { + return messageCount; + } + + public SimpleString getFilterString() + { + return filterString; + } + + public SimpleString getAddress() + { + return address; + } + + public SimpleString getName() + { + return name; + } + + public boolean isTemporary() + { + return temporary; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/Notification.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/Notification.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/Notification.java new file mode 100644 index 0000000..214dfdf --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/Notification.java @@ -0,0 +1,59 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.server.management; + +import org.apache.activemq6.api.core.management.NotificationType; +import org.apache.activemq6.utils.TypedProperties; + +/** + * A Notification + * @see org.apache.activemq6.core.server.management.NotificationListener + * @see NotificationType + * @author <a href="mailto:[email protected]">Tim Fox</a> Created 22 Jan 2009 16:41:12 + */ +public final class Notification +{ + private final NotificationType type; + + private final TypedProperties properties; + + private final String uid; + + public Notification(final String uid, final NotificationType type, final TypedProperties properties) + { + this.uid = uid; + this.type = type; + this.properties = properties; + } + + public NotificationType getType() + { + return type; + } + + public TypedProperties getProperties() + { + return properties; + } + + public String getUID() + { + return uid; + } + + @Override + public String toString() + { + return "Notification[uid=" + uid + ", type=" + type + ", properties=" + properties + "]"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationListener.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationListener.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationListener.java new file mode 100644 index 0000000..f20b1af --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.server.management; + + +/** + * A NotificationListener + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + * Created 22 Jan 2009 16:48:27 + * + * + */ +public interface NotificationListener +{ + void onNotification(Notification notification); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationService.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationService.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationService.java new file mode 100644 index 0000000..8cedaff --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/server/management/NotificationService.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.server.management; + +/** + * A NotificationService + * + * @author jmesnil + */ +public interface NotificationService +{ + /** + * the message corresponding to a notification will always contain the properties: + * <ul> + * <li><code>ManagementHelper.HDR_NOTIFICATION_TYPE</code> - the type of notification (SimpleString)</li> + * <li><code>ManagementHelper.HDR_NOTIFICATION_MESSAGE</code> - a message contextual to the notification (SimpleString)</li> + * <li><code>ManagementHelper.HDR_NOTIFICATION_TIMESTAMP</code> - the timestamp when the notification occurred (long)</li> + * </ul> + * in addition to the properties defined in <code>props</code> + * + * @see ManagementHelper + */ + void sendNotification(Notification notification) throws Exception; + + void enableNotifications(boolean enable); + + void addNotificationListener(NotificationListener listener); + + void removeNotificationListener(NotificationListener listener); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/settings/impl/AddressFullMessagePolicy.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/settings/impl/AddressFullMessagePolicy.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/settings/impl/AddressFullMessagePolicy.java new file mode 100644 index 0000000..2904027 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/settings/impl/AddressFullMessagePolicy.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.settings.impl; + +/** + * A AddressFullMessagePolicy + * + * @author Tim Fox + * + * + */ +public enum AddressFullMessagePolicy +{ + DROP, PAGE, BLOCK, FAIL; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/transaction/impl/XidImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/transaction/impl/XidImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/transaction/impl/XidImpl.java new file mode 100644 index 0000000..e2963a7 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/transaction/impl/XidImpl.java @@ -0,0 +1,218 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.transaction.impl; + +import java.io.Serializable; +import java.util.Arrays; + +import javax.transaction.xa.Xid; + +import org.apache.activemq6.utils.Base64; + +/** + * + * Xid implementation + * + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Juha Lindfors</a> + * + * @version $Revision 1.1 $ + */ +public class XidImpl implements Xid, Serializable +{ + private static final long serialVersionUID = 407053232840068514L; + + private final byte[] branchQualifier; + + private final int formatId; + + private final byte[] globalTransactionId; + + private int hash; + + private boolean hashCalculated; + + // Static -------------------------------------------------------- + + public static String toBase64String(final Xid xid) + { + byte[] data = XidImpl.toByteArray(xid); + return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + + private static byte[] toByteArray(final Xid xid) + { + byte[] branchQualifier = xid.getBranchQualifier(); + byte[] globalTransactionId = xid.getGlobalTransactionId(); + int formatId = xid.getFormatId(); + + byte[] hashBytes = new byte[branchQualifier.length + globalTransactionId.length + 4]; + System.arraycopy(branchQualifier, 0, hashBytes, 0, branchQualifier.length); + System.arraycopy(globalTransactionId, 0, hashBytes, branchQualifier.length, globalTransactionId.length); + byte[] intBytes = new byte[4]; + for (int i = 0; i < 4; i++) + { + intBytes[i] = (byte)((formatId >> i * 8) % 0xFF); + } + System.arraycopy(intBytes, 0, hashBytes, branchQualifier.length + globalTransactionId.length, 4); + return hashBytes; + } + + // Constructors -------------------------------------------------- + + /** + * Standard constructor + * @param branchQualifier + * @param formatId + * @param globalTransactionId + */ + public XidImpl(final byte[] branchQualifier, final int formatId, final byte[] globalTransactionId) + { + this.branchQualifier = branchQualifier; + this.formatId = formatId; + this.globalTransactionId = globalTransactionId; + } + + /** + * Copy constructor + * @param other + */ + public XidImpl(final Xid other) + { + branchQualifier = copyBytes(other.getBranchQualifier()); + formatId = other.getFormatId(); + globalTransactionId = copyBytes(other.getGlobalTransactionId()); + } + + // Xid implementation ------------------------------------------------------------------ + + public byte[] getBranchQualifier() + { + return branchQualifier; + } + + public int getFormatId() + { + return formatId; + } + + public byte[] getGlobalTransactionId() + { + return globalTransactionId; + } + + // Public ------------------------------------------------------------------------------- + + @Override + public int hashCode() + { + if (!hashCalculated) + { + calcHash(); + } + return hash; + } + + @Override + public boolean equals(final Object other) + { + if (this == other) + { + return true; + } + if (!(other instanceof Xid)) + { + return false; + } + Xid xother = (Xid)other; + if (xother.getFormatId() != formatId) + { + return false; + } + if (xother.getBranchQualifier().length != branchQualifier.length) + { + return false; + } + if (xother.getGlobalTransactionId().length != globalTransactionId.length) + { + return false; + } + for (int i = 0; i < branchQualifier.length; i++) + { + byte[] otherBQ = xother.getBranchQualifier(); + if (branchQualifier[i] != otherBQ[i]) + { + return false; + } + } + for (int i = 0; i < globalTransactionId.length; i++) + { + byte[] otherGtx = xother.getGlobalTransactionId(); + if (globalTransactionId[i] != otherGtx[i]) + { + return false; + } + } + return true; + } + + @Override + public String toString() + { + return "XidImpl (" + System.identityHashCode(this) + + " bq:" + + stringRep(branchQualifier) + + " formatID:" + + formatId + + " gtxid:" + + stringRep(globalTransactionId) + + " base64:" + toBase64String(this); + } + + // Private ------------------------------------------------------------------------------- + + private String stringRep(final byte[] bytes) + { + StringBuffer buff = new StringBuffer(); + for (int i = 0; i < bytes.length; i++) + { + byte b = bytes[i]; + + buff.append(b); + + if (i != bytes.length - 1) + { + buff.append('.'); + } + } + + return buff.toString(); + } + + private void calcHash() + { + byte[] hashBytes = XidImpl.toByteArray(this); + hash = Arrays.hashCode(hashBytes); + hashCalculated = true; + } + + private byte[] copyBytes(final byte[] other) + { + byte[] bytes = new byte[other.length]; + + System.arraycopy(other, 0, bytes, 0, other.length); + + return bytes; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/Version.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/Version.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/Version.java new file mode 100644 index 0000000..783a7a6 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/Version.java @@ -0,0 +1,39 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.version; + +/** + * + * A VersionImpl + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * + */ +public interface Version +{ + String getFullVersion(); + + String getVersionName(); + + int getMajorVersion(); + + int getMinorVersion(); + + int getMicroVersion(); + + String getVersionSuffix(); + + int getIncrementingVersion(); + + boolean isCompatible(int v); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/impl/VersionImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/impl/VersionImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/impl/VersionImpl.java new file mode 100644 index 0000000..7295dfb --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/version/impl/VersionImpl.java @@ -0,0 +1,202 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.version.impl; + +import java.io.Serializable; +import java.util.Arrays; + +import org.apache.activemq6.core.version.Version; + +/** + * A VersionImpl + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class VersionImpl implements Version, Serializable +{ + private static final long serialVersionUID = -5271227256591080403L; + + private final String versionName; + + private final int majorVersion; + + private final int minorVersion; + + private final int microVersion; + + private final int incrementingVersion; + + private final String versionSuffix; + + private final int[] compatibleVersionList; + + // Constructors -------------------------------------------------- + + public VersionImpl(final String versionName, + final int majorVersion, + final int minorVersion, + final int microVersion, + final int incrementingVersion, + final String versionSuffix, + final int[] compatibleVersionList) + { + this.versionName = versionName; + + this.majorVersion = majorVersion; + + this.minorVersion = minorVersion; + + this.microVersion = microVersion; + + this.incrementingVersion = incrementingVersion; + + this.versionSuffix = versionSuffix; + + this.compatibleVersionList = Arrays.copyOf(compatibleVersionList, compatibleVersionList.length); + } + + // Version implementation ------------------------------------------ + + public String getFullVersion() + { + return majorVersion + "." + + minorVersion + + "." + + microVersion + + "." + + versionSuffix + + " (" + + versionName + + ", " + + incrementingVersion + + ")"; + } + + public String getVersionName() + { + return versionName; + } + + public int getMajorVersion() + { + return majorVersion; + } + + public int getMinorVersion() + { + return minorVersion; + } + + public int getMicroVersion() + { + return microVersion; + } + + public String getVersionSuffix() + { + return versionSuffix; + } + + public int getIncrementingVersion() + { + return incrementingVersion; + } + + public boolean isCompatible(int version) + { + for (int element : compatibleVersionList) + { + if (element == version) + { + return true; + } + } + return false; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(compatibleVersionList); + result = prime * result + incrementingVersion; + result = prime * result + majorVersion; + result = prime * result + microVersion; + result = prime * result + minorVersion; + result = prime * result + ((versionName == null) ? 0 : versionName.hashCode()); + result = prime * result + ((versionSuffix == null) ? 0 : versionSuffix.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (!(obj instanceof VersionImpl)) + { + return false; + } + VersionImpl other = (VersionImpl)obj; + if (!Arrays.equals(compatibleVersionList, other.compatibleVersionList)) + { + return false; + } + if (incrementingVersion != other.incrementingVersion) + { + return false; + } + if (majorVersion != other.majorVersion) + { + return false; + } + if (microVersion != other.microVersion) + { + return false; + } + if (minorVersion != other.minorVersion) + { + return false; + } + if (versionName == null) + { + if (other.versionName != null) + { + return false; + } + } + else if (!versionName.equals(other.versionName)) + { + return false; + } + if (versionSuffix == null) + { + if (other.versionSuffix != null) + { + return false; + } + } + else if (!versionSuffix.equals(other.versionSuffix)) + { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/BytesMessageUtil.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/BytesMessageUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/BytesMessageUtil.java new file mode 100644 index 0000000..21bc209 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/BytesMessageUtil.java @@ -0,0 +1,233 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.reader; + +import org.apache.activemq6.api.core.Message; + +/** + * @author Clebert Suconic + */ + +public class BytesMessageUtil extends MessageUtil +{ + + public static boolean bytesReadBoolean(Message message) + { + return getBodyBuffer(message).readBoolean(); + } + + public static byte bytesReadByte(Message message) + { + return getBodyBuffer(message).readByte(); + } + + public static int bytesReadUnsignedByte(Message message) + { + return getBodyBuffer(message).readUnsignedByte(); + } + + public static short bytesReadShort(Message message) + { + return getBodyBuffer(message).readShort(); + } + + public static int bytesReadUnsignedShort(Message message) + { + return getBodyBuffer(message).readUnsignedShort(); + } + + public static char bytesReadChar(Message message) + { + return (char)getBodyBuffer(message).readShort(); + } + + public static int bytesReadInt(Message message) + { + return getBodyBuffer(message).readInt(); + } + + public static long bytesReadLong(Message message) + { + return getBodyBuffer(message).readLong(); + } + + public static float bytesReadFloat(Message message) + { + return Float.intBitsToFloat(getBodyBuffer(message).readInt()); + } + + public static double bytesReadDouble(Message message) + { + return Double.longBitsToDouble(getBodyBuffer(message).readLong()); + } + + public static String bytesReadUTF(Message message) + { + return getBodyBuffer(message).readUTF(); + } + + + + public static int bytesReadBytes(Message message, final byte[] value) + { + return bytesReadBytes(message, value, value.length); + } + + public static int bytesReadBytes(Message message, final byte[] value, final int length) + { + if (!getBodyBuffer(message).readable()) + { + return -1; + } + + int read = Math.min(length, getBodyBuffer(message).readableBytes()); + + if (read != 0) + { + getBodyBuffer(message).readBytes(value, 0, read); + } + + return read; + + } + + + public static void bytesWriteBoolean(Message message, boolean value) + { + getBodyBuffer(message).writeBoolean(value); + } + + + + public static void bytesWriteByte(Message message, byte value) + { + getBodyBuffer(message).writeByte(value); + } + + + + public static void bytesWriteShort(Message message, short value) + { + getBodyBuffer(message).writeShort(value); + } + + + public static void bytesWriteChar(Message message, char value) + { + getBodyBuffer(message).writeShort((short)value); + } + + public static void bytesWriteInt(Message message, int value) + { + getBodyBuffer(message).writeInt(value); + } + + public static void bytesWriteLong(Message message, long value) + { + getBodyBuffer(message).writeLong(value); + } + + public static void bytesWriteFloat(Message message, float value) + { + getBodyBuffer(message).writeInt(Float.floatToIntBits(value)); + } + + public static void bytesWriteDouble(Message message, double value) + { + getBodyBuffer(message).writeLong(Double.doubleToLongBits(value)); + } + + public static void bytesWriteUTF(Message message, String value) + { + getBodyBuffer(message).writeUTF(value); + } + + public static void bytesWriteBytes(Message message, byte[] value) + { + getBodyBuffer(message).writeBytes(value); + } + + public static void bytesWriteBytes(Message message, final byte[] value, final int offset, final int length) + { + getBodyBuffer(message).writeBytes(value, offset, length); + } + + + /** + * Returns true if it could send the Object to any known format + * @param message + * @param value + * @return + */ + public static boolean bytesWriteObject(Message message, Object value) + { + if (value == null) + { + throw new NullPointerException("Attempt to write a null value"); + } + if (value instanceof String) + { + bytesWriteUTF(message, (String) value); + } + else if (value instanceof Boolean) + { + bytesWriteBoolean(message, (Boolean) value); + } + else if (value instanceof Character) + { + bytesWriteChar(message, (Character) value); + } + else if (value instanceof Byte) + { + bytesWriteByte(message, (Byte) value); + } + else if (value instanceof Short) + { + bytesWriteShort(message, (Short) value); + } + else if (value instanceof Integer) + { + bytesWriteInt(message, (Integer) value); + } + else if (value instanceof Long) + { + bytesWriteLong(message, (Long) value); + } + else if (value instanceof Float) + { + bytesWriteFloat(message, (Float) value); + } + else if (value instanceof Double) + { + bytesWriteDouble(message, (Double) value); + } + else if (value instanceof byte[]) + { + bytesWriteBytes(message, (byte[]) value); + } + else + { + return false; + } + + + return true; + } + + public static void bytesMessageReset(Message message) + { + getBodyBuffer(message).resetReaderIndex(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MapMessageUtil.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MapMessageUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MapMessageUtil.java new file mode 100644 index 0000000..734b413 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MapMessageUtil.java @@ -0,0 +1,59 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.reader; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.utils.TypedProperties; + +/** + * @author Clebert Suconic + */ + +public class MapMessageUtil extends MessageUtil +{ + + /** + * Utility method to set the map on a message body + */ + public static void writeBodyMap(Message message, TypedProperties properties) + { + HornetQBuffer buff = getBodyBuffer(message); + buff.resetWriterIndex(); + properties.encode(buff); + } + + /** + * Utility method to set the map on a message body + */ + public static TypedProperties readBodyMap(Message message) + { + TypedProperties map = new TypedProperties(); + readBodyMap(message, map); + return map; + } + + /** + * Utility method to set the map on a message body + */ + public static void readBodyMap(Message message, TypedProperties map) + { + HornetQBuffer buff = getBodyBuffer(message); + buff.resetReaderIndex(); + map.decode(buff); + } + + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MessageUtil.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MessageUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MessageUtil.java new file mode 100644 index 0000000..1b0bba1 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/MessageUtil.java @@ -0,0 +1,201 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.reader; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQPropertyConversionException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; + +/** + * static methods intended for import static on JMS like messages. + * + * This provides a helper for core message to act some of the JMS functions used by the JMS wrapper + * + * @author Clebert Suconic + */ + +public class MessageUtil +{ + public static final SimpleString CORRELATIONID_HEADER_NAME = new SimpleString("JMSCorrelationID"); + + public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo"); + + public static final SimpleString TYPE_HEADER_NAME = new SimpleString("JMSType"); + + public static final SimpleString JMS = new SimpleString("JMS"); + + public static final SimpleString JMSX = new SimpleString("JMSX"); + + public static final SimpleString JMS_ = new SimpleString("JMS_"); + + public static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount"; + + public static final String JMSXGROUPID = "JMSXGroupID"; + + public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__HQ_CID"); + + + + public static HornetQBuffer getBodyBuffer(Message message) + { + return message.getBodyBuffer(); + } + + + + public static byte[] getJMSCorrelationIDAsBytes(Message message) + { + Object obj = message.getObjectProperty(CORRELATIONID_HEADER_NAME); + + if (obj instanceof byte[]) + { + return (byte[])obj; + } + else + { + return null; + } + } + + + + public static void setJMSType(Message message, String type) + { + message.putStringProperty(TYPE_HEADER_NAME, new SimpleString(type)); + } + + public static String getJMSType(Message message) + { + SimpleString ss = message.getSimpleStringProperty(TYPE_HEADER_NAME); + + if (ss != null) + { + return ss.toString(); + } + else + { + return null; + } + } + + + public static final void setJMSCorrelationIDAsBytes(Message message, final byte[] correlationID) throws HornetQException + { + if (correlationID == null || correlationID.length == 0) + { + throw new HornetQException("Please specify a non-zero length byte[]"); + } + message.putBytesProperty(CORRELATIONID_HEADER_NAME, correlationID); + } + + public static void setJMSCorrelationID(Message message, final String correlationID) + { + if (correlationID == null) + { + message.removeProperty(CORRELATIONID_HEADER_NAME); + } + else + { + message.putStringProperty(CORRELATIONID_HEADER_NAME, new SimpleString(correlationID)); + } + } + + public static String getJMSCorrelationID(Message message) + { + try + { + return message.getStringProperty(CORRELATIONID_HEADER_NAME); + } + catch (HornetQPropertyConversionException e) + { + return null; + } + } + + + public static SimpleString getJMSReplyTo(Message message) + { + return message.getSimpleStringProperty(REPLYTO_HEADER_NAME); + } + + public static void setJMSReplyTo(Message message, final SimpleString dest) + { + + if (dest == null) + { + message.removeProperty(REPLYTO_HEADER_NAME); + } + else + { + + message.putStringProperty(REPLYTO_HEADER_NAME, dest); + } + } + + + + public static void clearProperties(Message message) + { + + List<SimpleString> toRemove = new ArrayList<SimpleString>(); + + for (SimpleString propName : message.getPropertyNames()) + { + if (!propName.startsWith(JMS) || propName.startsWith(JMSX) || + propName.startsWith(JMS_)) + { + toRemove.add(propName); + } + } + + for (SimpleString propName : toRemove) + { + message.removeProperty(propName); + } + } + + + + public static Set<String> getPropertyNames(Message message) + { + HashSet<String> set = new HashSet<String>(); + + for (SimpleString propName : message.getPropertyNames()) + { + if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || + propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME)) + { + set.add(propName.toString()); + } + } + + set.add(JMSXDELIVERYCOUNT); + + return set; + } + + public static boolean propertyExists(Message message, String name) + { + return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) || + MessageUtil.JMSXGROUPID.equals(name) && + message.containsProperty(org.apache.activemq6.api.core.Message.HDR_GROUP_ID); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/StreamMessageUtil.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/StreamMessageUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/StreamMessageUtil.java new file mode 100644 index 0000000..b92132e --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/StreamMessageUtil.java @@ -0,0 +1,300 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.reader; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.utils.DataConstants; + +/** + * @author Clebert Suconic + */ + +public class StreamMessageUtil extends MessageUtil +{ + /** + * Method to read boolean values out of the Stream protocol existent on JMS Stream Messages + * Throws IllegalStateException if the type was invalid + * + * @param message + * @return + */ + public static boolean streamReadBoolean(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + + switch (type) + { + case DataConstants.BOOLEAN: + return buff.readBoolean(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Boolean.valueOf(s); + default: + throw new IllegalStateException("Invalid conversion, type byte was " + type); + } + + } + + public static byte streamReadByte(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + int index = buff.readerIndex(); + try + { + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Byte.parseByte(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + catch (NumberFormatException e) + { + buff.readerIndex(index); + throw e; + } + + } + + public static short streamReadShort(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.SHORT: + return buff.readShort(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Short.parseShort(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + public static char streamReadChar(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.CHAR: + return (char)buff.readShort(); + case DataConstants.STRING: + String str = buff.readNullableString(); + if (str == null) + { + throw new NullPointerException("Invalid conversion"); + } + else + { + throw new IllegalStateException("Invalid conversion"); + } + default: + throw new IllegalStateException("Invalid conversion"); + } + + } + + public static int streamReadInteger(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.SHORT: + return buff.readShort(); + case DataConstants.INT: + return buff.readInt(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Integer.parseInt(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + + public static long streamReadLong(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.SHORT: + return buff.readShort(); + case DataConstants.INT: + return buff.readInt(); + case DataConstants.LONG: + return buff.readLong(); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Long.parseLong(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + public static float streamReadFloat(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.FLOAT: + return Float.intBitsToFloat(buff.readInt()); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Float.parseFloat(s); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + + public static double streamReadDouble(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.FLOAT: + return Float.intBitsToFloat(buff.readInt()); + case DataConstants.DOUBLE: + return Double.longBitsToDouble(buff.readLong()); + case DataConstants.STRING: + String s = buff.readNullableString(); + return Double.parseDouble(s); + default: + throw new IllegalStateException("Invalid conversion: " + type); + } + } + + + public static String streamReadString(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BOOLEAN: + return String.valueOf(buff.readBoolean()); + case DataConstants.BYTE: + return String.valueOf(buff.readByte()); + case DataConstants.SHORT: + return String.valueOf(buff.readShort()); + case DataConstants.CHAR: + return String.valueOf((char)buff.readShort()); + case DataConstants.INT: + return String.valueOf(buff.readInt()); + case DataConstants.LONG: + return String.valueOf(buff.readLong()); + case DataConstants.FLOAT: + return String.valueOf(Float.intBitsToFloat(buff.readInt())); + case DataConstants.DOUBLE: + return String.valueOf(Double.longBitsToDouble(buff.readLong())); + case DataConstants.STRING: + return buff.readNullableString(); + default: + throw new IllegalStateException("Invalid conversion"); + } + } + + /** + * Utility for reading bytes out of streaming. + * It will return remainingBytes, bytesRead + * @param remainingBytes remaining Bytes from previous read. Send it to 0 if it was the first call for the message + * @param message + * @return a pair of remaining bytes and bytes read + */ + public static Pair<Integer, Integer> streamReadBytes(Message message, int remainingBytes, byte[] value) + { + HornetQBuffer buff = getBodyBuffer(message); + + if (remainingBytes == -1) + { + return new Pair<>(0, -1); + } + else if (remainingBytes == 0) + { + byte type = buff.readByte(); + if (type != DataConstants.BYTES) + { + throw new IllegalStateException("Invalid conversion"); + } + remainingBytes = buff.readInt(); + } + int read = Math.min(value.length, remainingBytes); + buff.readBytes(value, 0, read); + remainingBytes -= read; + if (remainingBytes == 0) + { + remainingBytes = -1; + } + return new Pair<>(remainingBytes, read); + + } + + public static Object streamReadObject(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + + byte type = buff.readByte(); + switch (type) + { + case DataConstants.BOOLEAN: + return buff.readBoolean(); + case DataConstants.BYTE: + return buff.readByte(); + case DataConstants.SHORT: + return buff.readShort(); + case DataConstants.CHAR: + return (char)buff.readShort(); + case DataConstants.INT: + return buff.readInt(); + case DataConstants.LONG: + return buff.readLong(); + case DataConstants.FLOAT: + return Float.intBitsToFloat(buff.readInt()); + case DataConstants.DOUBLE: + return Double.longBitsToDouble(buff.readLong()); + case DataConstants.STRING: + return buff.readNullableString(); + case DataConstants.BYTES: + int bufferLen = buff.readInt(); + byte[] bytes = new byte[bufferLen]; + buff.readBytes(bytes); + return bytes; + default: + throw new IllegalStateException("Invalid conversion"); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/TextMessageUtil.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/TextMessageUtil.java b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/TextMessageUtil.java new file mode 100644 index 0000000..8821e4e --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/TextMessageUtil.java @@ -0,0 +1,47 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.reader; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; + +/** + * @author Clebert Suconic + */ + +public class TextMessageUtil extends MessageUtil +{ + + /** + * Utility method to set the Text message on a message body + */ + public static void writeBodyText(Message message, SimpleString text) + { + HornetQBuffer buff = getBodyBuffer(message); + buff.clear(); + buff.writeNullableSimpleString(text); + } + + /** + * Utility method to set the Text message on a message body + */ + public static SimpleString readBodyText(Message message) + { + HornetQBuffer buff = getBodyBuffer(message); + buff.resetReaderIndex(); + return buff.readNullableSimpleString(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/reader/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/reader/package-info.java b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/package-info.java new file mode 100644 index 0000000..55cf5b5 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/reader/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/** + * Provides reading methods for JMS like objects. + * This isolates the logic from the JMS client case you need this kind of functionality from core. + * This is also used on conversions between protocols such as AMQP and Core where + * this is done at the server's + * @author Clebert Suconic + */ +package org.apache.activemq6.reader; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/AbstractRemotingConnection.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/AbstractRemotingConnection.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/AbstractRemotingConnection.java new file mode 100644 index 0000000..e826e0b --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/AbstractRemotingConnection.java @@ -0,0 +1,219 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.spi.core.protocol; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQInterruptedException; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.remoting.CloseListener; +import org.apache.activemq6.core.remoting.FailureListener; +import org.apache.activemq6.spi.core.remoting.Connection; + +/** + * @author Clebert Suconic + */ + +public abstract class AbstractRemotingConnection implements RemotingConnection +{ + protected final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>(); + protected final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>(); + protected final Connection transportConnection; + protected final Executor executor; + protected final long creationTime; + protected volatile boolean dataReceived; + + public AbstractRemotingConnection(final Connection transportConnection, final Executor executor) + { + this.transportConnection = transportConnection; + this.executor = executor; + this.creationTime = System.currentTimeMillis(); + } + + public List<FailureListener> getFailureListeners() + { + return new ArrayList<FailureListener>(failureListeners); + } + + protected void callFailureListeners(final HornetQException me, String scaleDownTargetNodeID) + { + final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners); + + for (final FailureListener listener : listenersClone) + { + try + { + listener.connectionFailed(me, false, scaleDownTargetNodeID); + } + catch (HornetQInterruptedException interrupted) + { + // this is an expected behaviour.. no warn or error here + HornetQClientLogger.LOGGER.debug("thread interrupted", interrupted); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + HornetQClientLogger.LOGGER.errorCallingFailureListener(t); + } + } + } + + + protected void callClosingListeners() + { + final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners); + + for (final CloseListener listener : listenersClone) + { + try + { + listener.connectionClosed(); + } + catch (final Throwable t) + { + // Failure of one listener to execute shouldn't prevent others + // from + // executing + HornetQClientLogger.LOGGER.errorCallingFailureListener(t); + } + } + } + + public void setFailureListeners(final List<FailureListener> listeners) + { + failureListeners.clear(); + + failureListeners.addAll(listeners); + } + + public Object getID() + { + return transportConnection.getID(); + } + + public String getRemoteAddress() + { + return transportConnection.getRemoteAddress(); + } + + public void addFailureListener(final FailureListener listener) + { + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull(); + } + failureListeners.add(listener); + } + + public boolean removeFailureListener(final FailureListener listener) + { + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.failListenerCannotBeNull(); + } + + return failureListeners.remove(listener); + } + + public void addCloseListener(final CloseListener listener) + { + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull(); + } + + closeListeners.add(listener); + } + + public boolean removeCloseListener(final CloseListener listener) + { + if (listener == null) + { + throw HornetQClientMessageBundle.BUNDLE.closeListenerCannotBeNull(); + } + + return closeListeners.remove(listener); + } + + public List<CloseListener> removeCloseListeners() + { + List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners); + + closeListeners.clear(); + + return ret; + } + + public List<FailureListener> removeFailureListeners() + { + List<FailureListener> ret = getFailureListeners(); + + failureListeners.clear(); + + return ret; + } + + public void setCloseListeners(List<CloseListener> listeners) + { + closeListeners.clear(); + + closeListeners.addAll(listeners); + } + + public HornetQBuffer createBuffer(final int size) + { + return transportConnection.createBuffer(size); + } + + public Connection getTransportConnection() + { + return transportConnection; + } + + public long getCreationTime() + { + return creationTime; + } + + public boolean checkDataReceived() + { + boolean res = dataReceived; + + dataReceived = false; + + return res; + } + + /* + * This can be called concurrently by more than one thread so needs to be locked + */ + public void fail(final HornetQException me) + { + fail(me, null); + } + + public void bufferReceived(final Object connectionID, final HornetQBuffer buffer) + { + dataReceived = true; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/ConnectionEntry.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/ConnectionEntry.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/ConnectionEntry.java new file mode 100644 index 0000000..2ebc470 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/ConnectionEntry.java @@ -0,0 +1,50 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.protocol; + +import java.util.concurrent.Executor; + + +/** + * A ConnectionEntry + * + * @author Tim Fox + * + * + */ +public class ConnectionEntry +{ + public final RemotingConnection connection; + + public volatile long lastCheck; + + public volatile long ttl; + + public final Executor connectionExecutor; + + public Object getID() + { + return connection.getID(); + } + + public ConnectionEntry(final RemotingConnection connection, final Executor connectionExecutor, final long lastCheck, final long ttl) + { + this.connection = connection; + + this.lastCheck = lastCheck; + + this.ttl = ttl; + + this.connectionExecutor = connectionExecutor; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/RemotingConnection.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/RemotingConnection.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/RemotingConnection.java new file mode 100644 index 0000000..51a048d --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/protocol/RemotingConnection.java @@ -0,0 +1,181 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.protocol; + +import java.util.List; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.core.remoting.CloseListener; +import org.apache.activemq6.core.remoting.FailureListener; +import org.apache.activemq6.spi.core.remoting.BufferHandler; +import org.apache.activemq6.spi.core.remoting.Connection; + +/** + * A RemotingConnection is a connection between a client and a server. + * + * + * Perhaps a better name for this class now would be ProtocolConnection as this + * represents the link with the used protocol + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public interface RemotingConnection extends BufferHandler +{ + /** + * Returns the unique id of the {@link RemotingConnection}. + * @return the id + */ + Object getID(); + + /** + * Returns the creation time of the {@link RemotingConnection}. + */ + long getCreationTime(); + + /** + * returns a string representation of the remote address of this connection + * + * @return the remote address + */ + String getRemoteAddress(); + + /** + * add a failure listener. + * <p> + * The listener will be called in the event of connection failure. + * + * @param listener the listener + */ + void addFailureListener(FailureListener listener); + + /** + * remove the failure listener + * + * @param listener the lister to remove + * @return true if removed + */ + boolean removeFailureListener(FailureListener listener); + + /** + * add a CloseListener. + * <p> + * This will be called in the event of the connection being closed. + * + * @param listener the listener to add + */ + void addCloseListener(CloseListener listener); + + /** + * remove a Close Listener + * + * @param listener the listener to remove + * @return true if removed + */ + boolean removeCloseListener(CloseListener listener); + + List<CloseListener> removeCloseListeners(); + + void setCloseListeners(List<CloseListener> listeners); + + + /** + * return all the failure listeners + * + * @return the listeners + */ + List<FailureListener> getFailureListeners(); + + List<FailureListener> removeFailureListeners(); + + + /** + * set the failure listeners. + * <p> + * These will be called in the event of the connection being closed. Any previosuly added listeners will be removed. + * + * @param listeners the listeners to add. + */ + void setFailureListeners(List<FailureListener> listeners); + + /** + * creates a new HornetQBuffer of the specified size. + * + * @param size the size of buffer required + * @return the buffer + */ + HornetQBuffer createBuffer(int size); + + /** + * called when the underlying connection fails. + * + * @param me the exception that caused the failure + */ + void fail(HornetQException me); + + /** + * called when the underlying connection fails. + * + * @param me the exception that caused the failure + * @param scaleDownTargetNodeID the ID of the node where scale down is targeted + */ + void fail(HornetQException me, String scaleDownTargetNodeID); + + /** + * destroys this connection. + */ + void destroy(); + + /** + * return the underlying Connection. + * + * @return the connection + */ + Connection getTransportConnection(); + + /** + * Returns whether or not the {@link RemotingConnection} is a client + * @return true if client, false if a server + */ + boolean isClient(); + + /** + * Returns true if this {@link RemotingConnection} has been destroyed. + * @return true if destroyed, otherwise false + */ + boolean isDestroyed(); + + /** + * Disconnect the connection, closing all channels + */ + void disconnect(boolean criticalError); + + /** + * Disconnect the connection, closing all channels + */ + void disconnect(String scaleDownNodeID, boolean criticalError); + + /** + * returns true if any data has been received since the last time this method was called. + * + * @return true if data has been received. + */ + boolean checkDataReceived(); + + /** + * flush all outstanding data from the connection. + */ + void flush(); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/AbstractConnector.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/AbstractConnector.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/AbstractConnector.java new file mode 100644 index 0000000..98c477c --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/AbstractConnector.java @@ -0,0 +1,30 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +import java.util.Map; + +/** + * Abstract connector + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + */ +public abstract class AbstractConnector implements Connector +{ + protected final Map<String, Object> configuration; + + protected AbstractConnector(Map<String, Object> configuration) + { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferDecoder.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferDecoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferDecoder.java new file mode 100644 index 0000000..23e853c --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferDecoder.java @@ -0,0 +1,34 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +import org.apache.activemq6.api.core.HornetQBuffer; + +/** + * A BufferDecoder + * + * @author tim + * + * + */ +public interface BufferDecoder +{ + /** + * called by the remoting system prior to {@link org.apache.activemq6.spi.core.remoting.BufferHandler#bufferReceived(Object, org.hornetq.api.core.HornetQBuffer)}. + * <p> + * The implementation should return true if there is enough data in the buffer to decode. otherwise false. + * * @param buffer the buffer + * @return true id the buffer can be decoded.. + */ + int isReadyToHandle(HornetQBuffer buffer); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferHandler.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferHandler.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferHandler.java new file mode 100644 index 0000000..3f3ae91 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/BufferHandler.java @@ -0,0 +1,33 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +import org.apache.activemq6.api.core.HornetQBuffer; + +/** + * A BufferHandler that will handle buffers received by an acceptor. + * <p> + * The Buffer Handler will decode the buffer and take the appropriate action, typically forwarding to the correct channel. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public interface BufferHandler +{ + /** + * called by the remoting connection when a buffer is received. + * + * @param connectionID the connection the buffer was received on + * @param buffer the buffer to decode + */ + void bufferReceived(Object connectionID, HornetQBuffer buffer); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManager.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManager.java new file mode 100644 index 0000000..f11faf1 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManager.java @@ -0,0 +1,78 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.spi.core.remoting; + +import java.util.List; +import java.util.concurrent.locks.Lock; + +import io.netty.channel.ChannelPipeline; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Interceptor; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + +/** + * @author Clebert Suconic + */ + +public interface ClientProtocolManager +{ + + /// Life Cycle Methods: + + RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TopologyResponseHandler topologyResponseHandler); + + RemotingConnection getCurrentConnection(); + + Lock lockSessionCreation(); + + boolean waitOnLatch(long milliseconds) throws InterruptedException; + + /** + * This is to be called when a connection failed and we want to interrupt any communication. + * This used to be called exitLoop at some point o the code.. with a method named causeExit from ClientSessionFactoryImpl + */ + void stop(); + + boolean isAlive(); + + /// Sending methods + + void addChannelHandlers(ChannelPipeline pipeline); + + void sendSubscribeTopology(boolean isServer); + + void ping(long connectionTTL); + + SessionContext createSessionContext(final String name, + final String username, + final String password, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + int minLargeMessageSize, + int confirmationWindowSize) throws HornetQException; + + boolean cleanupBeforeFailover(HornetQException cause); + + boolean checkForFailover(String liveNodeID) throws HornetQException; + + void setSessionFactory(ClientSessionFactory factory); + + ClientSessionFactory getSessionFactory(); + + String getName(); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManagerFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManagerFactory.java new file mode 100644 index 0000000..49e81f9 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/ClientProtocolManagerFactory.java @@ -0,0 +1,24 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +import java.io.Serializable; + +/** + * @author Clebert Suconic + */ +public interface ClientProtocolManagerFactory extends Serializable +{ + + ClientProtocolManager newProtocolManager(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connection.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connection.java b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connection.java new file mode 100644 index 0000000..5bd747b --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/spi/core/remoting/Connection.java @@ -0,0 +1,117 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.spi.core.remoting; + +import io.netty.channel.ChannelFutureListener; +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.core.security.HornetQPrincipal; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + +/** + * The connection used by a channel to write data to. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author Clebert Suconic + */ +public interface Connection +{ + /** + * Create a new HornetQBuffer of the given size. + * + * @param size the size of buffer to create + * @return the new buffer. + */ + HornetQBuffer createBuffer(int size); + + + RemotingConnection getProtocolConnection(); + + void setProtocolConnection(RemotingConnection connection); + + /** + * returns the unique id of this wire. + * + * @return the id + */ + Object getID(); + + /** + * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection. + * + * @param buffer the buffer to write + * @param flush whether to flush the buffers onto the wire + * @param batched whether the packet is allowed to batched for better performance + */ + void write(HornetQBuffer buffer, boolean flush, boolean batched); + + /** + * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection. + * + * @param buffer the buffer to write + * @param flush whether to flush the buffers onto the wire + * @param batched whether the packet is allowed to batched for better performance + */ + void write(HornetQBuffer buffer, boolean flush, boolean batched, ChannelFutureListener futureListener); + + /** + * writes the buffer to the connection with no flushing or batching + * + * @param buffer the buffer to write + */ + void write(HornetQBuffer buffer); + + + /** + * This should close the internal channel without calling any listeners. + * This is to avoid a situation where the broker is busy writing on an internal thread. + * This should close the socket releasing any pending threads. + */ + void forceClose(); + + /** + * Closes the connection. + */ + void close(); + + /** + * Returns a string representation of the remote address this connection is connected to. + * @return the remote address + */ + String getRemoteAddress(); + + /** + * Called periodically to flush any data in the batch buffer + */ + void checkFlushBatchBuffer(); + + void addReadyListener(ReadyListener listener); + + void removeReadyListener(ReadyListener listener); + + /** + * Generates a {@link TransportConfiguration} to be used to connect to the same target this is + * connected to. + * @return TransportConfiguration + */ + TransportConfiguration getConnectorConfig(); + + HornetQPrincipal getDefaultHornetQPrincipal(); + + /** + * the InVM Connection has some special handling as it doesn't use Netty ProtocolChannel + * we will use this method Instead of using instanceof + * @return + */ + boolean isUsingProtocolHandling(); +} \ No newline at end of file
