http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/SubscriptionInfo.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/SubscriptionInfo.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/SubscriptionInfo.java new file mode 100644 index 0000000..8bffddf --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/SubscriptionInfo.java @@ -0,0 +1,142 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.jms.management; + +import org.apache.activemq6.utils.json.JSONArray; +import org.apache.activemq6.utils.json.JSONObject; + +/** + * Helper class to create Java Objects from the + * JSON serialization returned by {@link TopicControl#listAllSubscriptionsAsJSON()} and related methods. + * + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public class SubscriptionInfo +{ + private final String queueName; + + private final String clientID; + + private final String name; + + private final boolean durable; + + private final String selector; + + private final int messageCount; + + private final int deliveringCount; + + // Static -------------------------------------------------------- + + /** + * Returns an array of SubscriptionInfo corresponding to the JSON serialization returned + * by {@link TopicControl#listAllSubscriptionsAsJSON()} and related methods. + */ + public static SubscriptionInfo[] from(final String jsonString) throws Exception + { + JSONArray array = new JSONArray(jsonString); + SubscriptionInfo[] infos = new SubscriptionInfo[array.length()]; + for (int i = 0; i < array.length(); i++) + { + JSONObject sub = array.getJSONObject(i); + SubscriptionInfo info = new SubscriptionInfo(sub.getString("queueName"), + sub.optString("clientID", null), + sub.optString("name", null), + sub.getBoolean("durable"), + sub.optString("selector", null), + sub.getInt("messageCount"), + sub.getInt("deliveringCount")); + infos[i] = info; + } + + return infos; + } + + // Constructors -------------------------------------------------- + + private SubscriptionInfo(final String queueName, + final String clientID, + final String name, + final boolean durable, + final String selector, + final int messageCount, + final int deliveringCount) + { + this.queueName = queueName; + this.clientID = clientID; + this.name = name; + this.durable = durable; + this.selector = selector; + this.messageCount = messageCount; + this.deliveringCount = deliveringCount; + } + + // Public -------------------------------------------------------- + + /** + * Returns the name of the HornetQ core queue corresponding to this subscription. + */ + public String getQueueName() + { + return queueName; + } + + /** + * Returns the client ID of this subscription or {@code null}. + */ + public String getClientID() + { + return clientID; + } + + /** + * Returns the name of this subscription. + */ + public String getName() + { + return name; + } + + /** + * Returns whether this subscription is durable. + */ + public boolean isDurable() + { + return durable; + } + + /** + * Returns the JMS message selector associated to this subscription. + */ + public String getSelector() + { + return selector; + } + + /** + * Returns the number of messages currently held by this subscription. + */ + public int getMessageCount() + { + return messageCount; + } + + /** + * Returns the number of messages currently delivered to this subscription. + */ + public int getDeliveringCount() + { + return deliveringCount; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/TopicControl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/TopicControl.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/TopicControl.java new file mode 100644 index 0000000..1ef88c8 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/TopicControl.java @@ -0,0 +1,149 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.jms.management; + +import java.util.Map; + +import javax.management.MBeanOperationInfo; + +import org.apache.activemq6.api.core.management.Operation; +import org.apache.activemq6.api.core.management.Parameter; + +/** + * A TopicControl is used to manage a JMS Topic. + * + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + */ +public interface TopicControl extends DestinationControl +{ + + /** + * Returns the number of (durable and non-durable) subscribers for this topic. + */ + int getSubscriptionCount(); + + /** + * Returns the number of <em>durable</em> subscribers for this topic. + */ + int getDurableSubscriptionCount(); + + /** + * Returns the number of <em>non-durable</em> subscribers for this topic. + */ + int getNonDurableSubscriptionCount(); + + /** + * Returns the number of messages for all <em>durable</em> subscribers for this topic. + */ + int getDurableMessageCount(); + + /** + * Returns the number of messages for all <em>non-durable</em> subscribers for this topic. + */ + int getNonDurableMessageCount(); + + /** + * Returns the JNDI bindings associated to this connection factory. + */ + @Operation(desc = "Returns the list of JNDI bindings associated") + String[] getJNDIBindings(); + + /** + * Add the JNDI binding to this destination + */ + @Operation(desc = "Adds the queue to another JNDI binding") + void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception; + + + + // Operations ---------------------------------------------------- + + /** + * Lists all the subscriptions for this topic (both durable and non-durable). + */ + @Operation(desc = "List all subscriptions") + Object[] listAllSubscriptions() throws Exception; + + /** + * Lists all the subscriptions for this topic (both durable and non-durable) using JSON serialization. + * <br> + * Java objects can be recreated from JSON serialization using {@link SubscriptionInfo#from(String)}. + */ + @Operation(desc = "List all subscriptions") + String listAllSubscriptionsAsJSON() throws Exception; + + /** + * Lists all the <em>durable</em> subscriptions for this topic. + */ + @Operation(desc = "List only the durable subscriptions") + Object[] listDurableSubscriptions() throws Exception; + + /** + * Lists all the <em>durable</em> subscriptions using JSON serialization. + * <br> + * Java objects can be recreated from JSON serialization using {@link SubscriptionInfo#from(String)}. + */ + @Operation(desc = "List only the durable subscriptions") + String listDurableSubscriptionsAsJSON() throws Exception; + + /** + * Lists all the <em>non-durable</em> subscriptions for this topic. + */ + @Operation(desc = "List only the non durable subscriptions") + Object[] listNonDurableSubscriptions() throws Exception; + + /** + * Lists all the <em>non-durable</em> subscriptions using JSON serialization. + * <br> + * Java objects can be recreated from JSON serialization using {@link SubscriptionInfo#from(String)}. + */ + @Operation(desc = "List only the non durable subscriptions") + String listNonDurableSubscriptionsAsJSON() throws Exception; + + /** + * Lists all the messages in this queue matching the specified queue representing the subscription. + * <br> + * 1 Map represents 1 message, keys are the message's properties and headers, values are the corresponding values. + */ + @Operation(desc = "List all the message for the given subscription") + Map<String, Object>[] listMessagesForSubscription(@Parameter(name = "queueName", desc = "the name of the queue representing a subscription") String queueName) throws Exception; + + /** + * Lists all the messages in this queue matching the specified queue representing the subscription using JSON serialization. + */ + @Operation(desc = "List all the message for the given subscription") + String listMessagesForSubscriptionAsJSON(@Parameter(name = "queueName", desc = "the name of the queue representing a subscription") String queueName) throws Exception; + + /** + * Counts the number of messages in the subscription specified by the specified client ID and subscription name. Only messages matching the filter will be counted. + * <br> + * Using {@code null} or an empty filter will count <em>all</em> messages from this queue. + */ + @Operation(desc = "Count the number of messages matching the filter for the given subscription") + int countMessagesForSubscription(@Parameter(name = "clientID", desc = "the client ID") String clientID, + @Parameter(name = "subscriptionName", desc = "the name of the durable subscription") String subscriptionName, + @Parameter(name = "filter", desc = "a JMS filter (can be empty)") String filter) throws Exception; + + /** + * Drops the subscription specified by the specified client ID and subscription name. + */ + @Operation(desc = "Drop a durable subscription", impact = MBeanOperationInfo.ACTION) + void dropDurableSubscription(@Parameter(name = "clientID", desc = "the client ID") String clientID, + @Parameter(name = "subscriptionName", desc = "the name of the durable subscription") String subscriptionName) throws Exception; + + /** + * Drops all subscriptions. + */ + @Operation(desc = "Drop all subscriptions from this topic", impact = MBeanOperationInfo.ACTION) + void dropAllSubscriptions() throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/package-info.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/package-info.java new file mode 100644 index 0000000..1e75322 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/management/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * Management API for HornetQ JMS resources. + * <br> + * HornetQ JMS resources can be managed either using JMX or by sending JMS management messages to the + * server's special management address. Please refer to the user manual for more information. + */ +package org.apache.activemq6.api.jms.management; + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/package-info.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/package-info.java new file mode 100644 index 0000000..ef279c2 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/api/jms/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * API to create HornetQ JMS resources. + * <br> + * This package contains classes to create + * HornetQ JMS managed resources (ConnectionFactory, Queue and Topic). + * + */ +package org.apache.activemq6.api.jms; + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQBytesMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQBytesMessage.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQBytesMessage.java new file mode 100644 index 0000000..c916af2 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQBytesMessage.java @@ -0,0 +1,436 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.core.message.impl.MessageImpl; + +import static org.apache.activemq6.reader.BytesMessageUtil.bytesMessageReset; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadBoolean; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadByte; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadBytes; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadChar; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadDouble; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadFloat; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadInt; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadLong; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadShort; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUTF; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUnsignedByte; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUnsignedShort; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteBoolean; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteByte; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteBytes; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteChar; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteDouble; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteFloat; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteInt; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteLong; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteObject; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteShort; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteUTF; + +/** + * HornetQ implementation of a JMS {@link BytesMessage}. + * + * @author Norbert Lataille ([email protected]) + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class HornetQBytesMessage extends HornetQMessage implements BytesMessage +{ + // Static ------------------------------------------------------- + public static final byte TYPE = Message.BYTES_TYPE; + + // Attributes ---------------------------------------------------- + + private int bodyLength; + + // Constructor --------------------------------------------------- + + /** + * This constructor is used to construct messages prior to sending + */ + protected HornetQBytesMessage(final ClientSession session) + { + super(HornetQBytesMessage.TYPE, session); + } + + /** + * Constructor on receipt at client side + */ + protected HornetQBytesMessage(final ClientMessage message, final ClientSession session) + { + super(message, session); + } + + /** + * Foreign message constructor + */ + public HornetQBytesMessage(final BytesMessage foreign, final ClientSession session) throws JMSException + { + super(foreign, HornetQBytesMessage.TYPE, session); + + foreign.reset(); + + byte[] buffer = new byte[1024]; + int n = foreign.readBytes(buffer); + while (n != -1) + { + writeBytes(buffer, 0, n); + n = foreign.readBytes(buffer); + } + } + + // BytesMessage implementation ----------------------------------- + + public boolean readBoolean() throws JMSException + { + checkRead(); + try + { + return bytesReadBoolean(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public byte readByte() throws JMSException + { + checkRead(); + try + { + return bytesReadByte(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readUnsignedByte() throws JMSException + { + checkRead(); + try + { + return bytesReadUnsignedByte(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public short readShort() throws JMSException + { + checkRead(); + try + { + return bytesReadShort(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readUnsignedShort() throws JMSException + { + checkRead(); + try + { + return bytesReadUnsignedShort(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public char readChar() throws JMSException + { + checkRead(); + try + { + return bytesReadChar(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readInt() throws JMSException + { + checkRead(); + try + { + return bytesReadInt(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public long readLong() throws JMSException + { + checkRead(); + try + { + return bytesReadLong(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public float readFloat() throws JMSException + { + checkRead(); + try + { + return bytesReadFloat(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public double readDouble() throws JMSException + { + checkRead(); + try + { + return bytesReadDouble(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public String readUTF() throws JMSException + { + checkRead(); + try + { + return bytesReadUTF(message); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + catch (Exception e) + { + JMSException je = new JMSException("Failed to get UTF"); + je.setLinkedException(e); + je.initCause(e); + throw je; + } + } + + public int readBytes(final byte[] value) throws JMSException + { + checkRead(); + return bytesReadBytes(message, value); + } + + public int readBytes(final byte[] value, final int length) throws JMSException + { + checkRead(); + return bytesReadBytes(message, value, length); + + } + + public void writeBoolean(final boolean value) throws JMSException + { + checkWrite(); + bytesWriteBoolean(message, value); + } + + public void writeByte(final byte value) throws JMSException + { + checkWrite(); + bytesWriteByte(message, value); + } + + public void writeShort(final short value) throws JMSException + { + checkWrite(); + bytesWriteShort(message, value); + } + + public void writeChar(final char value) throws JMSException + { + checkWrite(); + bytesWriteChar(message, value); + } + + public void writeInt(final int value) throws JMSException + { + checkWrite(); + bytesWriteInt(message, value); + } + + public void writeLong(final long value) throws JMSException + { + checkWrite(); + bytesWriteLong(message, value); + } + + public void writeFloat(final float value) throws JMSException + { + checkWrite(); + bytesWriteFloat(message, value); + } + + public void writeDouble(final double value) throws JMSException + { + checkWrite(); + bytesWriteDouble(message, value); + } + + public void writeUTF(final String value) throws JMSException + { + checkWrite(); + try + { + bytesWriteUTF(message, value); + } + catch (Exception e) + { + JMSException je = new JMSException("Failed to write UTF"); + je.setLinkedException(e); + je.initCause(e); + throw je; + } + + } + + public void writeBytes(final byte[] value) throws JMSException + { + checkWrite(); + bytesWriteBytes(message, value); + } + + public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException + { + checkWrite(); + bytesWriteBytes(message, value, offset, length); + } + + public void writeObject(final Object value) throws JMSException + { + checkWrite(); + if (!bytesWriteObject(message, value)) + { + throw new MessageFormatException("Invalid object for properties"); + } + } + + public void reset() throws JMSException + { + if (!readOnly) + { + readOnly = true; + + bodyLength = message.getBodySize(); + } + + bytesMessageReset(message); + } + + @Override + public void doBeforeReceive() throws HornetQException + { + bodyLength = message.getBodySize(); + } + + // HornetQRAMessage overrides ---------------------------------------- + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + try + { + getBuffer().clear(); + } + catch (RuntimeException e) + { + JMSException e2 = new JMSException(e.getMessage()); + e2.initCause(e); + throw e2; + } + } + + public long getBodyLength() throws JMSException + { + checkRead(); + + return bodyLength; + } + + @Override + public void doBeforeSend() throws Exception + { + reset(); + } + + // Public -------------------------------------------------------- + + @Override + public byte getType() + { + return HornetQBytesMessage.TYPE; + } + + private HornetQBuffer getBuffer() + { + return message.getBodyBuffer(); + } + + @Override + public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") + Class c) + { + return c.isAssignableFrom(byte[].class); + } + + @Override + protected <T> T getBodyInternal(Class<T> c) + { + if (bodyLength == 0) + return null; + byte[] dst = new byte[bodyLength]; + message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst); + return (T)dst; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnection.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnection.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnection.java new file mode 100644 index 0000000..bd59895 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnection.java @@ -0,0 +1,862 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.InvalidClientIDException; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import java.lang.ref.WeakReference; +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQExceptionType; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.FailoverEventListener; +import org.apache.activemq6.api.core.client.FailoverEventType; +import org.apache.activemq6.api.core.client.SessionFailureListener; +import org.apache.activemq6.api.jms.HornetQJMSConstants; +import org.apache.activemq6.core.client.impl.ClientSessionInternal; +import org.apache.activemq6.core.version.Version; +import org.apache.activemq6.reader.MessageUtil; +import org.apache.activemq6.utils.ConcurrentHashSet; +import org.apache.activemq6.utils.UUIDGenerator; +import org.apache.activemq6.utils.VersionLoader; + +/** + * HornetQ implementation of a JMS Connection. + * <p> + * The flat implementation of {@link TopicConnection} and {@link QueueConnection} is per design, + * following the common usage of these as one flat API in JMS 1.1. + * + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class HornetQConnection extends HornetQConnectionForContextImpl implements TopicConnection, QueueConnection +{ + // Constants ------------------------------------------------------------------------------------ + public static final int TYPE_GENERIC_CONNECTION = 0; + + public static final int TYPE_QUEUE_CONNECTION = 1; + + public static final int TYPE_TOPIC_CONNECTION = 2; + + public static final String EXCEPTION_FAILOVER = "FAILOVER"; + + public static final String EXCEPTION_DISCONNECT = "DISCONNECT"; + + public static final SimpleString CONNECTION_ID_PROPERTY_NAME = MessageUtil.CONNECTION_ID_PROPERTY_NAME; + + // Static --------------------------------------------------------------------------------------- + + // Attributes ----------------------------------------------------------------------------------- + + private final int connectionType; + + private final Set<HornetQSession> sessions = new org.apache.activemq6.utils.ConcurrentHashSet<HornetQSession>(); + + private final Set<SimpleString> tempQueues = new org.apache.activemq6.utils.ConcurrentHashSet<SimpleString>(); + + private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<SimpleString>(); + + private volatile boolean hasNoLocal; + + private volatile ExceptionListener exceptionListener; + + private volatile FailoverEventListener failoverEventListener; + + private volatile boolean justCreated = true; + + private volatile ConnectionMetaData metaData; + + private volatile boolean closed; + + private volatile boolean started; + + private String clientID; + + private final ClientSessionFactory sessionFactory; + + private final SimpleString uid; + + private final String username; + + private final String password; + + private final SessionFailureListener listener = new JMSFailureListener(this); + + private final FailoverEventListener failoverListener = new FailoverEventListenerImpl(this); + + private final Version thisVersion; + + private final int dupsOKBatchSize; + + private final int transactionBatchSize; + + private ClientSession initialSession; + + private final Exception creationStack; + + private HornetQConnectionFactory factoryReference; + + // Constructors --------------------------------------------------------------------------------- + + public HornetQConnection(final String username, final String password, final int connectionType, + final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, + final ClientSessionFactory sessionFactory) + { + this.username = username; + + this.password = password; + + this.connectionType = connectionType; + + this.clientID = clientID; + + this.sessionFactory = sessionFactory; + + uid = UUIDGenerator.getInstance().generateSimpleStringUUID(); + + thisVersion = VersionLoader.getVersion(); + + this.dupsOKBatchSize = dupsOKBatchSize; + + this.transactionBatchSize = transactionBatchSize; + + creationStack = new Exception(); + } + + /** + * This internal method serves basically the Resource Adapter. + * The resource adapter plays with an XASession and a non XASession. + * When there is no enlisted transaction, the EE specification mandates that the commit should + * be done as if it was a nonXA Session (i.e. SessionTransacted). + * For that reason we have this method to force that nonXASession, since the JMS Javadoc + * mandates createSession to return a XASession. + */ + public Session createNonXASession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + + return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_GENERIC_CONNECTION); + } + + /** + * This internal method serves basically the Resource Adapter. + * The resource adapter plays with an XASession and a non XASession. + * When there is no enlisted transaction, the EE specification mandates that the commit should + * be done as if it was a nonXA Session (i.e. SessionTransacted). + * For that reason we have this method to force that nonXASession, since the JMS Javadoc + * mandates createSession to return a XASession. + */ + public Session createNonXATopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + + return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_TOPIC_CONNECTION); + } + + /** + * This internal method serves basically the Resource Adapter. + * The resource adapter plays with an XASession and a non XASession. + * When there is no enlisted transaction, the EE specification mandates that the commit should + * be done as if it was a nonXA Session (i.e. SessionTransacted). + * For that reason we have this method to force that nonXASession, since the JMS Javadoc + * mandates createSession to return a XASession. + */ + public Session createNonXAQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + + return createSessionInternal(false, transacted, acknowledgeMode, HornetQConnection.TYPE_QUEUE_CONNECTION); + } + + + // Connection implementation -------------------------------------------------------------------- + + public synchronized Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + + return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQConnection.TYPE_GENERIC_CONNECTION); + } + + public String getClientID() throws JMSException + { + checkClosed(); + + return clientID; + } + + public void setClientID(final String clientID) throws JMSException + { + checkClosed(); + + if (this.clientID != null) + { + throw new IllegalStateException("Client id has already been set"); + } + + if (!justCreated) + { + throw new IllegalStateException("setClientID can only be called directly after the connection is created"); + } + + try + { + initialSession.addUniqueMetaData("jms-client-id", clientID); + } + catch (HornetQException e) + { + if (e.getType() == HornetQExceptionType.DUPLICATE_METADATA) + { + throw new InvalidClientIDException("clientID=" + clientID + " was already set into another connection"); + } + } + + this.clientID = clientID; + try + { + this.addSessionMetaData(initialSession); + } + catch (HornetQException e) + { + JMSException ex = new JMSException("Internal error setting metadata jms-client-id"); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; + } + + justCreated = false; + } + + public ConnectionMetaData getMetaData() throws JMSException + { + checkClosed(); + + justCreated = false; + + if (metaData == null) + { + metaData = new HornetQConnectionMetaData(thisVersion); + } + + return metaData; + } + + public ExceptionListener getExceptionListener() throws JMSException + { + checkClosed(); + + justCreated = false; + + return exceptionListener; + } + + public void setExceptionListener(final ExceptionListener listener) throws JMSException + { + checkClosed(); + + exceptionListener = listener; + justCreated = false; + } + + public synchronized void start() throws JMSException + { + checkClosed(); + + for (HornetQSession session : sessions) + { + session.start(); + } + + justCreated = false; + started = true; + } + + public synchronized void signalStopToAllSessions() + { + for (HornetQSession session : sessions) + { + ClientSession coreSession = session.getCoreSession(); + if (coreSession instanceof ClientSessionInternal) + { + ClientSessionInternal internalSession = (ClientSessionInternal) coreSession; + internalSession.setStopSignal(); + } + } + + } + + public synchronized void stop() throws JMSException + { + threadAwareContext.assertNotMessageListenerThread(); + + checkClosed(); + + for (HornetQSession session : sessions) + { + session.stop(); + } + + justCreated = false; + started = false; + } + + public final synchronized void close() throws JMSException + { + threadAwareContext.assertNotCompletionListenerThread(); + threadAwareContext.assertNotMessageListenerThread(); + + if (closed) + { + return; + } + + sessionFactory.close(); + + try + { + for (HornetQSession session : new HashSet<HornetQSession>(sessions)) + { + session.close(); + } + + try + { + if (!tempQueues.isEmpty()) + { + // Remove any temporary queues + + for (SimpleString queueName : tempQueues) + { + if (!initialSession.isClosed()) + { + try + { + initialSession.deleteQueue(queueName); + } + catch (HornetQException ignore) + { + // Exception on deleting queue shouldn't prevent close from completing + } + } + } + } + } + finally + { + if (initialSession != null) + { + initialSession.close(); + } + } + + closed = true; + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public ConnectionConsumer + createConnectionConsumer(final Destination destination, final String messageSelector, + final ServerSessionPool sessionPool, final int maxMessages) throws JMSException + { + checkClosed(); + + checkTempQueues(destination); + + // We offer a RA, so no need to implement this for MDBs + return null; + } + + private void checkTempQueues(Destination destination) throws JMSException + { + HornetQDestination jbdest = (HornetQDestination) destination; + + if (jbdest.isTemporary() && !containsTemporaryQueue(jbdest.getSimpleAddress())) + { + throw new JMSException("Can not create consumer for temporary destination " + destination + + " from another JMS connection"); + } + } + + public ConnectionConsumer + createDurableConnectionConsumer(final Topic topic, final String subscriptionName, + final String messageSelector, final ServerSessionPool sessionPool, + final int maxMessages) throws JMSException + { + checkClosed(); + // As spec. section 4.11 + if (connectionType == HornetQConnection.TYPE_QUEUE_CONNECTION) + { + String msg = "Cannot create a durable connection consumer on a QueueConnection"; + throw new javax.jms.IllegalStateException(msg); + } + checkTempQueues(topic); + // We offer RA, so no need for this + return null; + } + + @Override + public Session createSession(int sessionMode) throws JMSException + { + checkClosed(); + return createSessionInternal(false, sessionMode == Session.SESSION_TRANSACTED, sessionMode, HornetQSession.TYPE_GENERIC_SESSION); + + } + + @Override + public Session createSession() throws JMSException + { + checkClosed(); + return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE, HornetQSession.TYPE_GENERIC_SESSION); + } + + // QueueConnection implementation --------------------------------------------------------------- + + public QueueSession createQueueSession(final boolean transacted, int acknowledgeMode) throws JMSException + { + checkClosed(); + return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQSession.TYPE_QUEUE_SESSION); + } + + /** + * I'm keeping this as static as the same check will be done within RA. + * This is to conform with TCK Tests where we must return ackMode exactly as they want if transacted=false + */ + public static int checkAck(boolean transacted, int acknowledgeMode) + { + if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) + { + return Session.AUTO_ACKNOWLEDGE; + } + + return acknowledgeMode; + } + + public ConnectionConsumer + createConnectionConsumer(final Queue queue, final String messageSelector, + final ServerSessionPool sessionPool, final int maxMessages) throws JMSException + { + checkClosed(); + checkTempQueues(queue); + return null; + } + + // TopicConnection implementation --------------------------------------------------------------- + + public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + checkClosed(); + return createSessionInternal(false, transacted, checkAck(transacted, acknowledgeMode), HornetQSession.TYPE_TOPIC_SESSION); + } + + public ConnectionConsumer + createConnectionConsumer(final Topic topic, final String messageSelector, + final ServerSessionPool sessionPool, final int maxMessages) throws JMSException + { + checkClosed(); + checkTempQueues(topic); + return null; + } + + @Override + public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException + { + return null; // we offer RA + } + + @Override + public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException + { + return null; // we offer RA + } + + // Public --------------------------------------------------------------------------------------- + + /** + * Sets a FailureListener for the session which is notified if a failure occurs on the session. + * + * @param listener the listener to add + * @throws JMSException + */ + public void setFailoverListener(final FailoverEventListener listener) throws JMSException + { + checkClosed(); + + justCreated = false; + + this.failoverEventListener = listener; + + } + + /** + * @return {@link FailoverEventListener} the current failover event listener for this connection + * @throws JMSException + */ + public FailoverEventListener getFailoverListener() throws JMSException + { + checkClosed(); + + justCreated = false; + + return failoverEventListener; + } + + public void addTemporaryQueue(final SimpleString queueAddress) + { + tempQueues.add(queueAddress); + knownDestinations.add(queueAddress); + } + + public void removeTemporaryQueue(final SimpleString queueAddress) + { + tempQueues.remove(queueAddress); + } + + public void addKnownDestination(final SimpleString address) + { + knownDestinations.add(address); + } + + public boolean containsKnownDestination(final SimpleString address) + { + return knownDestinations.contains(address); + } + + public boolean containsTemporaryQueue(final SimpleString queueAddress) + { + return tempQueues.contains(queueAddress); + } + + public boolean hasNoLocal() + { + return hasNoLocal; + } + + public void setHasNoLocal() + { + hasNoLocal = true; + } + + public SimpleString getUID() + { + return uid; + } + + public void removeSession(final HornetQSession session) + { + sessions.remove(session); + } + + public ClientSession getInitialSession() + { + return initialSession; + } + + // Package protected ---------------------------------------------------------------------------- + + // Protected ------------------------------------------------------------------------------------ + + // In case the user forgets to close the connection manually + + @Override + protected final void finalize() throws Throwable + { + if (!closed) + { + HornetQJMSClientLogger.LOGGER.connectionLeftOpen(creationStack); + + close(); + } + } + + protected boolean isXA() + { + return false; + } + + protected final HornetQSession + createSessionInternal(final boolean isXA, final boolean transacted, int acknowledgeMode, final int type) throws JMSException + { + if (transacted) + { + acknowledgeMode = Session.SESSION_TRANSACTED; + } + + try + { + ClientSession session; + + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + session = + sessionFactory.createSession(username, password, isXA, false, false, + sessionFactory.getServerLocator().isPreAcknowledge(), + transactionBatchSize); + } + else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) + { + session = + sessionFactory.createSession(username, password, isXA, true, true, + sessionFactory.getServerLocator().isPreAcknowledge(), 0); + } + else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) + { + session = + sessionFactory.createSession(username, password, isXA, true, true, + sessionFactory.getServerLocator().isPreAcknowledge(), dupsOKBatchSize); + } + else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) + { + session = + sessionFactory.createSession(username, password, isXA, true, false, + sessionFactory.getServerLocator().isPreAcknowledge(), + transactionBatchSize); + } + else if (acknowledgeMode == HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) + { + session = + sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize); + } + else if (acknowledgeMode == HornetQJMSConstants.PRE_ACKNOWLEDGE) + { + session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize); + } + else + { + throw new JMSRuntimeException("Invalid ackmode: " + acknowledgeMode); + } + + justCreated = false; + + // Setting multiple times on different sessions doesn't matter since RemotingConnection + // maintains + // a set (no duplicates) + session.addFailureListener(listener); + session.addFailoverListener(failoverListener); + + HornetQSession jbs = createHQSession(isXA, transacted, acknowledgeMode, session, type); + + sessions.add(jbs); + + if (started) + { + session.start(); + } + + this.addSessionMetaData(session); + + return jbs; + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + // Private -------------------------------------------------------------------------------------- + + /** + * @param transacted + * @param acknowledgeMode + * @param session + * @param type + * @return + */ + protected HornetQSession createHQSession(boolean isXA, boolean transacted, int acknowledgeMode, ClientSession session, int type) + { + if (isXA) + { + return new HornetQXASession(this, transacted, true, acknowledgeMode, session, type); + } + else + { + return new HornetQSession(this, transacted, false, acknowledgeMode, session, type); + } + } + + protected final void checkClosed() throws JMSException + { + if (closed) + { + throw new IllegalStateException("Connection is closed"); + } + } + + public void authorize() throws JMSException + { + try + { + initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0); + + addSessionMetaData(initialSession); + + initialSession.addFailureListener(listener); + initialSession.addFailoverListener(failoverListener); + } + catch (HornetQException me) + { + throw JMSExceptionHelper.convertFromHornetQException(me); + } + } + + private void addSessionMetaData(ClientSession session) throws HornetQException + { + session.addMetaData("jms-session", ""); + if (clientID != null) + { + session.addMetaData("jms-client-id", clientID); + } + } + + public void setReference(HornetQConnectionFactory factory) + { + this.factoryReference = factory; + } + + public boolean isStarted() + { + return started; + } + + + // Inner classes -------------------------------------------------------------------------------- + + private static class JMSFailureListener implements SessionFailureListener + { + private final WeakReference<HornetQConnection> connectionRef; + + JMSFailureListener(final HornetQConnection connection) + { + connectionRef = new WeakReference<HornetQConnection>(connection); + } + + @Override + public synchronized void connectionFailed(final HornetQException me, boolean failedOver) + { + if (me == null) + { + return; + } + + HornetQConnection conn = connectionRef.get(); + + if (conn != null) + { + try + { + final ExceptionListener exceptionListener = conn.getExceptionListener(); + + if (exceptionListener != null) + { + final JMSException je = + new JMSException(me.toString(), failedOver ? EXCEPTION_FAILOVER : EXCEPTION_DISCONNECT); + + je.initCause(me); + + new Thread(new Runnable() + { + public void run() + { + exceptionListener.onException(je); + } + }).start(); + } + } + catch (JMSException e) + { + if (!conn.closed) + { + HornetQJMSClientLogger.LOGGER.errorCallingExcListener(e); + } + } + } + } + + @Override + public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID) + { + connectionFailed(me, failedOver); + } + + public void beforeReconnect(final HornetQException me) + { + + } + + } + + private static class FailoverEventListenerImpl implements FailoverEventListener + { + private final WeakReference<HornetQConnection> connectionRef; + + FailoverEventListenerImpl(final HornetQConnection connection) + { + connectionRef = new WeakReference<HornetQConnection>(connection); + } + + @Override + public void failoverEvent(final FailoverEventType eventType) + { + HornetQConnection conn = connectionRef.get(); + + if (conn != null) + { + try + { + final FailoverEventListener failoverListener = conn.getFailoverListener(); + + if (failoverListener != null) + { + + new Thread(new Runnable() + { + public void run() + { + failoverListener.failoverEvent(eventType); + } + }).start(); + } + } + catch (JMSException e) + { + if (!conn.closed) + { + HornetQJMSClientLogger.LOGGER.errorCallingFailoverListener(e); + } + } + } + + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionFactory.java new file mode 100644 index 0000000..9fe7aae --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionFactory.java @@ -0,0 +1,821 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.JMSSecurityException; +import javax.jms.JMSSecurityRuntimeException; +import javax.jms.QueueConnection; +import javax.jms.TopicConnection; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XAJMSContext; +import javax.jms.XAQueueConnection; +import javax.jms.XATopicConnection; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import java.io.Serializable; + +import org.apache.activemq6.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ClientSessionFactory; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.jms.JMSFactoryType; +import org.apache.activemq6.jms.referenceable.ConnectionFactoryObjectFactory; +import org.apache.activemq6.jms.referenceable.SerializableObjectRefAddr; + +/** + * HornetQ implementation of a JMS ConnectionFactory. + * + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public class HornetQConnectionFactory implements Serializable, Referenceable, ConnectionFactory, XAConnectionFactory +{ + private static final long serialVersionUID = -2810634789345348326L; + + private final ServerLocator serverLocator; + + private String clientID; + + private int dupsOKBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE; + + private int transactionBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE; + + private boolean readOnly; + + public HornetQConnectionFactory() + { + serverLocator = null; + } + + public HornetQConnectionFactory(final ServerLocator serverLocator) + { + this.serverLocator = serverLocator; + + serverLocator.disableFinalizeCheck(); + } + + public HornetQConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + if (ha) + { + serverLocator = HornetQClient.createServerLocatorWithHA(groupConfiguration); + } + else + { + serverLocator = HornetQClient.createServerLocatorWithoutHA(groupConfiguration); + } + + serverLocator.disableFinalizeCheck(); + } + + public HornetQConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + if (ha) + { + serverLocator = HornetQClient.createServerLocatorWithHA(initialConnectors); + } + else + { + serverLocator = HornetQClient.createServerLocatorWithoutHA(initialConnectors); + } + + serverLocator.disableFinalizeCheck(); + } + + // ConnectionFactory implementation ------------------------------------------------------------- + + public Connection createConnection() throws JMSException + { + return createConnection(null, null); + } + + public Connection createConnection(final String username, final String password) throws JMSException + { + return createConnectionInternal(username, password, false, HornetQConnection.TYPE_GENERIC_CONNECTION); + } + + @Override + public JMSContext createContext() + { + return createContext(null, null); + } + + @Override + public JMSContext createContext(final int sessionMode) + { + return createContext(null, null, sessionMode); + } + + @Override + public JMSContext createContext(final String userName, final String password) + { + return createContext(userName, password, JMSContext.AUTO_ACKNOWLEDGE); + } + + @Override + public JMSContext createContext(String userName, String password, int sessionMode) + { + validateSessionMode(sessionMode); + try + { + HornetQConnection connection = + createConnectionInternal(userName, password, false, HornetQConnection.TYPE_GENERIC_CONNECTION); + return connection.createContext(sessionMode); + } + catch (JMSSecurityException e) + { + throw new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + /** + * @param mode + */ + private static void validateSessionMode(int mode) + { + switch (mode) + { + case JMSContext.AUTO_ACKNOWLEDGE: + case JMSContext.CLIENT_ACKNOWLEDGE: + case JMSContext.DUPS_OK_ACKNOWLEDGE: + case JMSContext.SESSION_TRANSACTED: + { + return; + } + default: + throw new JMSRuntimeException("Invalid Session Mode: " + mode); + } + } + + // QueueConnectionFactory implementation -------------------------------------------------------- + + public QueueConnection createQueueConnection() throws JMSException + { + return createQueueConnection(null, null); + } + + public QueueConnection createQueueConnection(final String username, final String password) throws JMSException + { + return createConnectionInternal(username, password, false, HornetQConnection.TYPE_QUEUE_CONNECTION); + } + + // TopicConnectionFactory implementation -------------------------------------------------------- + + public TopicConnection createTopicConnection() throws JMSException + { + return createTopicConnection(null, null); + } + + public TopicConnection createTopicConnection(final String username, final String password) throws JMSException + { + return createConnectionInternal(username, password, false, HornetQConnection.TYPE_TOPIC_CONNECTION); + } + + // XAConnectionFactory implementation ----------------------------------------------------------- + + public XAConnection createXAConnection() throws JMSException + { + return createXAConnection(null, null); + } + + public XAConnection createXAConnection(final String username, final String password) throws JMSException + { + return (XAConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_GENERIC_CONNECTION); + } + + @Override + public XAJMSContext createXAContext() + { + return createXAContext(null, null); + } + + @Override + public XAJMSContext createXAContext(String userName, String password) + { + try + { + HornetQConnection connection = + createConnectionInternal(userName, password, true, HornetQConnection.TYPE_GENERIC_CONNECTION); + return connection.createXAContext(); + } + catch (JMSSecurityException e) + { + throw new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + + // XAQueueConnectionFactory implementation ------------------------------------------------------ + + public XAQueueConnection createXAQueueConnection() throws JMSException + { + return createXAQueueConnection(null, null); + } + + public XAQueueConnection createXAQueueConnection(final String username, final String password) throws JMSException + { + return (XAQueueConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_QUEUE_CONNECTION); + } + + // XATopicConnectionFactory implementation ------------------------------------------------------ + + public XATopicConnection createXATopicConnection() throws JMSException + { + return createXATopicConnection(null, null); + } + + public XATopicConnection createXATopicConnection(final String username, final String password) throws JMSException + { + return (XATopicConnection) createConnectionInternal(username, password, true, HornetQConnection.TYPE_TOPIC_CONNECTION); + } + + @Override + public Reference getReference() throws NamingException + { + return new Reference(this.getClass().getCanonicalName(), + new SerializableObjectRefAddr("HornetQ-CF", this), + ConnectionFactoryObjectFactory.class.getCanonicalName(), + null); + } + + // Public --------------------------------------------------------------------------------------- + + public boolean isHA() + { + return serverLocator.isHA(); + } + + public synchronized String getConnectionLoadBalancingPolicyClassName() + { + return serverLocator.getConnectionLoadBalancingPolicyClassName(); + } + + public synchronized void setConnectionLoadBalancingPolicyClassName(final String connectionLoadBalancingPolicyClassName) + { + checkWrite(); + serverLocator.setConnectionLoadBalancingPolicyClassName(connectionLoadBalancingPolicyClassName); + } + + public synchronized TransportConfiguration[] getStaticConnectors() + { + return serverLocator.getStaticTransportConfigurations(); + } + + public synchronized DiscoveryGroupConfiguration getDiscoveryGroupConfiguration() + { + return serverLocator.getDiscoveryGroupConfiguration(); + } + + public synchronized String getClientID() + { + return clientID; + } + + public synchronized void setClientID(final String clientID) + { + checkWrite(); + this.clientID = clientID; + } + + public synchronized int getDupsOKBatchSize() + { + return dupsOKBatchSize; + } + + public synchronized void setDupsOKBatchSize(final int dupsOKBatchSize) + { + checkWrite(); + this.dupsOKBatchSize = dupsOKBatchSize; + } + + public synchronized int getTransactionBatchSize() + { + return transactionBatchSize; + } + + public synchronized void setTransactionBatchSize(final int transactionBatchSize) + { + checkWrite(); + this.transactionBatchSize = transactionBatchSize; + } + + public synchronized long getClientFailureCheckPeriod() + { + return serverLocator.getClientFailureCheckPeriod(); + } + + public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod) + { + checkWrite(); + serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod); + } + + public synchronized long getConnectionTTL() + { + return serverLocator.getConnectionTTL(); + } + + public synchronized void setConnectionTTL(final long connectionTTL) + { + checkWrite(); + serverLocator.setConnectionTTL(connectionTTL); + } + + public synchronized long getCallTimeout() + { + return serverLocator.getCallTimeout(); + } + + public synchronized void setCallTimeout(final long callTimeout) + { + checkWrite(); + serverLocator.setCallTimeout(callTimeout); + } + + public synchronized long getCallFailoverTimeout() + { + return serverLocator.getCallFailoverTimeout(); + } + + public synchronized void setCallFailoverTimeout(final long callTimeout) + { + checkWrite(); + serverLocator.setCallFailoverTimeout(callTimeout); + } + + public synchronized int getConsumerWindowSize() + { + return serverLocator.getConsumerWindowSize(); + } + + public synchronized void setConsumerWindowSize(final int consumerWindowSize) + { + checkWrite(); + serverLocator.setConsumerWindowSize(consumerWindowSize); + } + + public synchronized int getConsumerMaxRate() + { + return serverLocator.getConsumerMaxRate(); + } + + public synchronized void setConsumerMaxRate(final int consumerMaxRate) + { + checkWrite(); + serverLocator.setConsumerMaxRate(consumerMaxRate); + } + + public synchronized int getConfirmationWindowSize() + { + return serverLocator.getConfirmationWindowSize(); + } + + public synchronized void setConfirmationWindowSize(final int confirmationWindowSize) + { + checkWrite(); + serverLocator.setConfirmationWindowSize(confirmationWindowSize); + } + + public synchronized int getProducerMaxRate() + { + return serverLocator.getProducerMaxRate(); + } + + public synchronized void setProducerMaxRate(final int producerMaxRate) + { + checkWrite(); + serverLocator.setProducerMaxRate(producerMaxRate); + } + + public synchronized int getProducerWindowSize() + { + return serverLocator.getProducerWindowSize(); + } + + public synchronized void setProducerWindowSize(final int producerWindowSize) + { + checkWrite(); + serverLocator.setProducerWindowSize(producerWindowSize); + } + + /** + * @param cacheLargeMessagesClient + */ + public synchronized void setCacheLargeMessagesClient(final boolean cacheLargeMessagesClient) + { + checkWrite(); + serverLocator.setCacheLargeMessagesClient(cacheLargeMessagesClient); + } + + public synchronized boolean isCacheLargeMessagesClient() + { + return serverLocator.isCacheLargeMessagesClient(); + } + + public synchronized int getMinLargeMessageSize() + { + return serverLocator.getMinLargeMessageSize(); + } + + public synchronized void setMinLargeMessageSize(final int minLargeMessageSize) + { + checkWrite(); + serverLocator.setMinLargeMessageSize(minLargeMessageSize); + } + + public synchronized boolean isBlockOnAcknowledge() + { + return serverLocator.isBlockOnAcknowledge(); + } + + public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge) + { + checkWrite(); + serverLocator.setBlockOnAcknowledge(blockOnAcknowledge); + } + + public synchronized boolean isBlockOnNonDurableSend() + { + return serverLocator.isBlockOnNonDurableSend(); + } + + public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend) + { + checkWrite(); + serverLocator.setBlockOnNonDurableSend(blockOnNonDurableSend); + } + + public synchronized boolean isBlockOnDurableSend() + { + return serverLocator.isBlockOnDurableSend(); + } + + public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend) + { + checkWrite(); + serverLocator.setBlockOnDurableSend(blockOnDurableSend); + } + + public synchronized boolean isAutoGroup() + { + return serverLocator.isAutoGroup(); + } + + public synchronized void setAutoGroup(final boolean autoGroup) + { + checkWrite(); + serverLocator.setAutoGroup(autoGroup); + } + + public synchronized boolean isPreAcknowledge() + { + return serverLocator.isPreAcknowledge(); + } + + public synchronized void setPreAcknowledge(final boolean preAcknowledge) + { + checkWrite(); + serverLocator.setPreAcknowledge(preAcknowledge); + } + + public synchronized long getRetryInterval() + { + return serverLocator.getRetryInterval(); + } + + public synchronized void setRetryInterval(final long retryInterval) + { + checkWrite(); + serverLocator.setRetryInterval(retryInterval); + } + + public synchronized long getMaxRetryInterval() + { + return serverLocator.getMaxRetryInterval(); + } + + public synchronized void setMaxRetryInterval(final long retryInterval) + { + checkWrite(); + serverLocator.setMaxRetryInterval(retryInterval); + } + + public synchronized double getRetryIntervalMultiplier() + { + return serverLocator.getRetryIntervalMultiplier(); + } + + public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier) + { + checkWrite(); + serverLocator.setRetryIntervalMultiplier(retryIntervalMultiplier); + } + + public synchronized int getReconnectAttempts() + { + return serverLocator.getReconnectAttempts(); + } + + public synchronized void setReconnectAttempts(final int reconnectAttempts) + { + checkWrite(); + serverLocator.setReconnectAttempts(reconnectAttempts); + } + + public synchronized void setInitialConnectAttempts(final int reconnectAttempts) + { + checkWrite(); + serverLocator.setInitialConnectAttempts(reconnectAttempts); + } + + public synchronized int getInitialConnectAttempts() + { + checkWrite(); + return serverLocator.getInitialConnectAttempts(); + } + + public synchronized boolean isFailoverOnInitialConnection() + { + return serverLocator.isFailoverOnInitialConnection(); + } + + public synchronized void setFailoverOnInitialConnection(final boolean failover) + { + checkWrite(); + serverLocator.setFailoverOnInitialConnection(failover); + } + + public synchronized boolean isUseGlobalPools() + { + return serverLocator.isUseGlobalPools(); + } + + public synchronized void setUseGlobalPools(final boolean useGlobalPools) + { + checkWrite(); + serverLocator.setUseGlobalPools(useGlobalPools); + } + + public synchronized int getScheduledThreadPoolMaxSize() + { + return serverLocator.getScheduledThreadPoolMaxSize(); + } + + public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize) + { + checkWrite(); + serverLocator.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize); + } + + public synchronized int getThreadPoolMaxSize() + { + return serverLocator.getThreadPoolMaxSize(); + } + + public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize) + { + checkWrite(); + serverLocator.setThreadPoolMaxSize(threadPoolMaxSize); + } + + public synchronized int getInitialMessagePacketSize() + { + return serverLocator.getInitialMessagePacketSize(); + } + + public synchronized void setInitialMessagePacketSize(final int size) + { + checkWrite(); + serverLocator.setInitialMessagePacketSize(size); + } + + public void setGroupID(final String groupID) + { + serverLocator.setGroupID(groupID); + } + + public String getGroupID() + { + return serverLocator.getGroupID(); + } + + public boolean isCompressLargeMessage() + { + return serverLocator.isCompressLargeMessage(); + } + + public void setCompressLargeMessage(boolean avoidLargeMessages) + { + serverLocator.setCompressLargeMessage(avoidLargeMessages); + } + + public void close() + { + ServerLocator locator0 = serverLocator; + if (locator0 != null) + locator0.close(); + } + + public ServerLocator getServerLocator() + { + return serverLocator; + } + + public int getFactoryType() + { + return JMSFactoryType.CF.intValue(); + } + + // Package protected ---------------------------------------------------------------------------- + + // Protected ------------------------------------------------------------------------------------ + + protected synchronized HornetQConnection createConnectionInternal(final String username, + final String password, + final boolean isXA, + final int type) throws JMSException + { + readOnly = true; + + ClientSessionFactory factory; + + try + { + factory = serverLocator.createSessionFactory(); + } + catch (Exception e) + { + JMSException jmse = new JMSException("Failed to create session factory"); + + jmse.initCause(e); + jmse.setLinkedException(e); + + throw jmse; + } + + HornetQConnection connection = null; + + if (isXA) + { + if (type == HornetQConnection.TYPE_GENERIC_CONNECTION) + { + connection = new HornetQXAConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + else if (type == HornetQConnection.TYPE_QUEUE_CONNECTION) + { + connection = + new HornetQXAConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + else if (type == HornetQConnection.TYPE_TOPIC_CONNECTION) + { + connection = + new HornetQXAConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + } + else + { + if (type == HornetQConnection.TYPE_GENERIC_CONNECTION) + { + connection = new HornetQConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + else if (type == HornetQConnection.TYPE_QUEUE_CONNECTION) + { + connection = + new HornetQConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + else if (type == HornetQConnection.TYPE_TOPIC_CONNECTION) + { + connection = + new HornetQConnection(username, + password, + type, + clientID, + dupsOKBatchSize, + transactionBatchSize, + factory); + } + } + + if (connection == null) + { + throw new JMSException("Failed to create connection: invalid type " + type); + } + connection.setReference(this); + + try + { + connection.authorize(); + } + catch (JMSException e) + { + try + { + connection.close(); + } + catch (JMSException me) + { + } + throw e; + } + + return connection; + } + + @Override + public String toString() + { + return "HornetQConnectionFactory [serverLocator=" + serverLocator + + ", clientID=" + + clientID + + ", consumerWindowSize = " + + getConsumerWindowSize() + + ", dupsOKBatchSize=" + + dupsOKBatchSize + + ", transactionBatchSize=" + + transactionBatchSize + + ", readOnly=" + + readOnly + + "]"; + } + + + // Private -------------------------------------------------------------------------------------- + + private void checkWrite() + { + if (readOnly) + { + throw new IllegalStateException("Cannot set attribute on HornetQConnectionFactory after it has been used"); + } + } + + @Override + protected void finalize() throws Throwable + { + try + { + serverLocator.close(); + } + catch (Exception e) + { + e.printStackTrace(); + //not much we can do here + } + super.finalize(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContext.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContext.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContext.java new file mode 100644 index 0000000..80528b4 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContext.java @@ -0,0 +1,34 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * + */ +package org.apache.activemq6.jms.client; + +import javax.jms.JMSContext; +import javax.jms.XAJMSContext; + +/** + * Interface created to support reference counting all contexts using it. + * <p> + * Necessary to support {@code JMSContext.close()} conditions. + * @see JMSContext + */ +public interface HornetQConnectionForContext extends javax.jms.Connection +{ + JMSContext createContext(int sessionMode); + + XAJMSContext createXAContext(); + + void closeFromContext(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContextImpl.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContextImpl.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContextImpl.java new file mode 100644 index 0000000..b77d08d --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionForContextImpl.java @@ -0,0 +1,91 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +/** + * + */ +package org.apache.activemq6.jms.client; + +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.JMSRuntimeException; +import javax.jms.Session; +import javax.jms.XAJMSContext; + +import org.apache.activemq6.api.jms.HornetQJMSConstants; +import org.apache.activemq6.utils.ReferenceCounter; +import org.apache.activemq6.utils.ReferenceCounterUtil; + +public abstract class HornetQConnectionForContextImpl implements HornetQConnectionForContext +{ + + final Runnable closeRunnable = new Runnable() + { + public void run() + { + try + { + close(); + } + catch (JMSException e) + { + throw JmsExceptionUtils.convertToRuntimeException(e); + } + } + }; + + final ReferenceCounter refCounter = new ReferenceCounterUtil(closeRunnable); + + protected final ThreadAwareContext threadAwareContext = new ThreadAwareContext(); + + public JMSContext createContext(int sessionMode) + { + switch (sessionMode) + { + case Session.AUTO_ACKNOWLEDGE: + case Session.CLIENT_ACKNOWLEDGE: + case Session.DUPS_OK_ACKNOWLEDGE: + case Session.SESSION_TRANSACTED: + case HornetQJMSConstants.INDIVIDUAL_ACKNOWLEDGE: + case HornetQJMSConstants.PRE_ACKNOWLEDGE: + break; + default: + throw new JMSRuntimeException("Invalid ackmode: " + sessionMode); + } + refCounter.increment(); + + return new HornetQJMSContext(this, sessionMode, threadAwareContext); + } + + public XAJMSContext createXAContext() + { + refCounter.increment(); + + return new HornetQXAJMSContext(this, threadAwareContext); + } + + @Override + public void closeFromContext() + { + refCounter.decrement(); + } + + protected void incrementRefCounter() + { + refCounter.increment(); + } + + public ThreadAwareContext getThreadAwareContext() + { + return threadAwareContext; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionMetaData.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionMetaData.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionMetaData.java new file mode 100644 index 0000000..453cb9e --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQConnectionMetaData.java @@ -0,0 +1,97 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import java.util.Enumeration; +import java.util.Vector; + +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; + +import org.apache.activemq6.core.version.Version; + +/** + * HornetQ implementation of a JMS ConnectionMetaData. + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * + */ +public class HornetQConnectionMetaData implements ConnectionMetaData +{ + // Constants ----------------------------------------------------- + + private static final String HORNETQ = "HornetQ"; + + // Static -------------------------------------------------------- + + // Attributes ---------------------------------------------------- + + private final Version serverVersion; + + // Constructors -------------------------------------------------- + + /** + * Create a new HornetQConnectionMetaData object. + */ + public HornetQConnectionMetaData(final Version serverVersion) + { + this.serverVersion = serverVersion; + } + + // ConnectionMetaData implementation ----------------------------- + + public String getJMSVersion() throws JMSException + { + return "2.0"; + } + + public int getJMSMajorVersion() throws JMSException + { + return 2; + } + + public int getJMSMinorVersion() throws JMSException + { + return 0; + } + + public String getJMSProviderName() throws JMSException + { + return HornetQConnectionMetaData.HORNETQ; + } + + public String getProviderVersion() throws JMSException + { + return serverVersion.getFullVersion(); + } + + public int getProviderMajorVersion() throws JMSException + { + return serverVersion.getMajorVersion(); + } + + public int getProviderMinorVersion() throws JMSException + { + return serverVersion.getMinorVersion(); + } + + public Enumeration getJMSXPropertyNames() throws JMSException + { + Vector<Object> v = new Vector<Object>(); + v.add("JMSXGroupID"); + v.add("JMSXGroupSeq"); + v.add("JMSXDeliveryCount"); + return v.elements(); + } +}
