http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/StateManager.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/StateManager.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/StateManager.java deleted file mode 100644 index 26db514..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/StateManager.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hedwig.jms; - -import org.slf4j.Logger; - -import javax.jms.JMSException; - - -/** - Specific to jms package - NOT to be used elsewhere. - - The util class allows for managing the current readiness 'state' of the object which hosts it - along the axis of StateManager.State while being MT safe. Right now, both Connection and Session make use of it. - - The lockObject is used to do timed wait's (which the host object will notify on) in case of async state changes. - - - This is not a general purpose code, but specific to state transitions mentioned in jms spec. - - - All use of the class goes like this : - - - StateManager.State prevState; - acquire lock: - if in transition state, wait. - if in expected state, return. - if in error state, return/throw exception. - if in valid state transition state - - prevState = currentState. - set to corresponding transition state (STARTING, CLOSING, etc). - Other method specific changes. - release lock: - - nextState = prevState (in case state change failed, revert). - - try { - attempt state change. - on success nextState = next valid state for this method. - } finally { - acquire lock: - change state to nextState - release lock: - } - - * So at any given point of time, the state will be in transition ONLY when there is an attempt - being made to transition. - * The state will always be in a final state at all other points of time. - * No attempt will be made to change state while a transition state is currently in progress. - */ -final class StateManager { - public static final long WAIT_TIME_FOR_TRANSIENT_STATE_CHANGE = - Long.getLong("WAIT_TIME_FOR_TRANSIENT_STATE_CHANGE", 16000L); - - static enum State { - STARTING(true, false, true), - STARTED(true, false, false), - STOPPING(false, false, true), - STOPPED(false, false, false), - CLOSING(false, true, true), - CLOSED(false, true, false); - - private final boolean inStartMode; - private final boolean inCloseMode; - private final boolean inTransitionMode; - - State(boolean inStartMode, boolean inCloseMode, boolean inTransitionMode) { - this.inStartMode = inStartMode; - this.inCloseMode = inCloseMode; - this.inTransitionMode = inTransitionMode; - } - - public boolean isInStartMode() { - return inStartMode; - } - - public boolean isInCloseMode() { - return inCloseMode; - } - - public boolean isInTransitionMode() { - return inTransitionMode; - } - } - - // DO NOT do something silly like State.STARTING == currentState || State.STARTED == currentState, etc ! - private volatile State currentState; - private final Object lockObject; - - StateManager(State startStart, Object lockObject){ - this.currentState = startStart; - this.lockObject = lockObject; - } - - State getCurrentState() { - return currentState; - } - - boolean isStarted() { - return State.STARTED == currentState; - } - - boolean isInStartMode() { - return currentState.isInStartMode(); - } - - boolean isStopped() { - return State.STOPPED == currentState; - } - - boolean isClosed() { - return State.CLOSED == currentState; - } - - // NOT locking explicitly : typically, already locked on lockObject - boolean isInCloseMode() { - return currentState.isInCloseMode(); - } - - // NOT locking explicitly : typically, already locked on lockObject - boolean isTransitionState() { - return currentState.isInTransitionMode(); - } - - void setCurrentState(State currentState) { - this.currentState = currentState; - } - - // NOT locking explicitly : MUST be already locked on lockObject - void waitForTransientStateChange(long timeout, Logger logger) throws JMSException { - final long startTime = SessionImpl.currentTimeMillis(); - final int WAIT_UNIT = 100; - int retryCount = (int)(timeout / WAIT_UNIT); - - while (isTransitionState()) { - try { - // If we are NOT locked on lockObject, this will throw exception ! - lockObject.wait(WAIT_UNIT); - } catch (InterruptedException e) { - // bubble it up. - JMSException jex = new JMSException("Thread interrupted ... " + e); - jex.setLinkedException(e); - throw jex; - } - retryCount --; - if (retryCount <= 0) { - if (logger.isDebugEnabled()) DebugUtil.dumpAllStacktraces(logger); - // throw new JMSException("wait timeout " + (SessionImpl.currentTimeMillis() - startTime)); - throw new JMSException("wait for " + (SessionImpl.currentTimeMillis() - startTime) + " timeout"); - } - } - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("StateManager"); - sb.append("{currentState=").append(currentState); - sb.append('}'); - return sb.toString(); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContext.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContext.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContext.java deleted file mode 100644 index 2e0cddd..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContext.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.jndi; - -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - -import javax.jms.ConnectionFactory; -import javax.naming.Name; -import javax.naming.NamingException; -import javax.naming.directory.InitialDirContext; -import java.util.Collections; -import java.util.HashSet; -import java.util.Hashtable; -import java.util.Set; - -/** - * Based (very very loosely) on - * <a href="http://docs.oracle.com/javase/1.3/docs/guide/jndi/spec/spi/jndispi.fm.html">jndi guide</a>. <br/> - * The InitialContext implementation clients should be using to get to our implementation. <br/> - * It is possible (by configuring via administrative means for example) to use a different DirContext - * to get to our provider implementation - * if the various classes exposed are the same as exposed via this DirContext.<br/> - * <p/> - * Ideally, the env property - * {@link javax.naming.Context.INITIAL_CONTEXT_FACTORY} "java.naming.factory.initial" is set to our factory - * {@link HedwigInitialContextFactory} classname which will return this InitialDirContext. - */ -public class HedwigInitialContext extends InitialDirContext { - - public static final String CONNECTION_FACTORY_NAME = "jms/ConnectionFactory"; - public static final String TOPIC_CONNECTION_FACTORY_NAME = "jms/TopicConnectionFactory"; - // public static final String QUEUE_CONNECTION_FACTORY_NAME = "jms/QueueConnectionFactory"; - - // Hardcoding to point to HedwigConnectionFactoryImpl by default. - private static final Set<String> defaultNamesMapping; - static { - Set<String> set = new HashSet<String>(8); - - // The actual name's for the various factories are bound by an admin. For convinence sake, - // we are providing default bindings. - - // The default connection - set.add("jms/ConnectionFactory"); - set.add("jms/TopicConnectionFactory"); - // Add in future - for now, we do not support it. - // set.add("jms/QueueConnectionFactory"); - - - set.add("ConnectionFactory"); - set.add("TopicConnectionFactory"); - // Add in future - for now, we do not support it. - // set.add("QueueConnectionFactory"); - defaultNamesMapping = Collections.unmodifiableSet(set); - } - - protected HedwigInitialContext(boolean lazy) throws NamingException { - super(lazy); - } - - public HedwigInitialContext() throws NamingException { - super(); - } - - public HedwigInitialContext(Hashtable<?, ?> environment) throws NamingException { - super(environment); - } - - private ConnectionFactory ourLookup(String name){ - if (defaultNamesMapping.contains(name)){ - return new HedwigConnectionFactoryImpl(); - } - - return null; - } - - @Override - public Object lookup(String name) throws NamingException { - ConnectionFactory factory = ourLookup(name); - if (null != factory) return factory; - - return super.lookup(name); - } - - @Override - public Object lookup(Name name) throws NamingException { - ConnectionFactory factory = ourLookup(name.toString()); - if (null != factory) return factory; - - return super.lookup(name); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContextFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContextFactory.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContextFactory.java deleted file mode 100644 index b701aad..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/HedwigInitialContextFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.jndi; - -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.spi.InitialContextFactory; -import java.util.Hashtable; - -/** - * See HedwigInitialContext for more information. - */ -public class HedwigInitialContextFactory implements InitialContextFactory { - @Override - public Context getInitialContext(Hashtable<?, ?> environment) throws NamingException { - return new HedwigInitialContext(environment); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/package-info.html ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/package-info.html b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/package-info.html deleted file mode 100644 index bab5787..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/jndi/package-info.html +++ /dev/null @@ -1,65 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - - -Contains some default SHIM's to interface with JNDI - so that clients can use our JMS provider without -ANY code level ties.<br/> -The ideal way to use JNDI is to have administrator configure JNDI such that a well-known jndi NAME is -bound to "org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl" for Topic and JMS connection factory. <br/> -<p/> -<p/> -Alternatively, if the user is NOT within an admin controlled JNDI environment, there are two other -common options : <br/> -<ul> - <li>Set the "java.naming.factory.initial" environment property to our Context factory - "org.apache.hedwig.jms.jndi.HedwigInitialContextFactory"</li> - <li>Directly instantiate the "org.apache.hedwig.jms.jndi.HedwigInitialContext" as a JNDI InitialContext - and pull the relevant factories, via it.</li> -</ul> - -In either of these two cases, we expose 6 well defined 'names' for users, to pull the relevant -factories from the JNDI context. -<table border="1"> - <tr> - <th>JNDI name</th> - <th>Connection factory</th> - </tr> - <tr> - <td>jms/ConnectionFactory</td> - <td>ConnectionFactory</td> - </tr> - <tr> - <td>jms/TopicConnectionFactory</td> - <td>TopicConnectionFactory</td> - </tr> - <tr> - <td>jms/QueueConnectionFactory</td> - <td>QueueConnectionFactory</td> - </tr> - <tr> - <td>ConnectionFactory</td> - <td>ConnectionFactory</td> - </tr> - <tr> - <td>TopicConnectionFactory</td> - <td>TopicConnectionFactory</td> - </tr> - <tr> - <td>QueueConnectionFactory</td> - <td>QueueConnectionFactory</td> - </tr> -</table> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/BytesMessageImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/BytesMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/BytesMessageImpl.java deleted file mode 100644 index fb564dc..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/BytesMessageImpl.java +++ /dev/null @@ -1,657 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.message; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.protocol.PubSubProtocol; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageEOFException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Map; - -/** - * To be used for raw bytes ... - */ -public class BytesMessageImpl extends MessageImpl implements BytesMessage { - private ReadOnlyMessage readOnlyMessage; - private WriteOnlyMessage writeOnlyMessage; - private boolean readMode; - - public BytesMessageImpl(SessionImpl session) throws JMSException { - super(session); - clearBody(); - } - - // To clone a message - public BytesMessageImpl(SessionImpl session, BytesMessageImpl message, String sourceTopicName, - String subscriberId) throws JMSException { - super(session, (MessageImpl) message, sourceTopicName, subscriberId); - try { - if (message.readMode){ - this.readOnlyMessage = new ReadOnlyMessage(message.readOnlyMessage.getDataCopy()); - this.writeOnlyMessage = null; - } - else { - this.readOnlyMessage = null; - this.writeOnlyMessage = new WriteOnlyMessage(message.writeOnlyMessage.getPayloadAsBytes()); - } - } catch (IOException e) { - JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e); - ex.setLinkedException(e); - throw ex; - } - this.readMode = message.readMode; - } - - // To clone a message from a BytesMessage which is NOT BytesMessageImpl - // Changing order of parameter to NOT accidentally clash with the constructor above. - // This is midly confusing, but helps a lot in preventing accidental bugs ! - public BytesMessageImpl(BytesMessage message, SessionImpl session) throws JMSException { - super((Message) message, session); - - if (message instanceof BytesMessageImpl) { - throw new JMSException("Coding bug - should use this constructor ONLY for non " + - "BytesMessageImpl messages"); - } - - // copy the bytes ... - final byte[] data; - { - final long length = message.getBodyLength(); - if (length < 0 || length >= Integer.MAX_VALUE) throw new JMSException("Unreasonably " + - "large value for body Length : " + length); - - data = new byte[(int) length]; - int read = 0; - while (read < length){ - int sz = message.readBytes(data, read); - read += sz; - } - } - - try { - this.writeOnlyMessage = new WriteOnlyMessage(data); - } catch (IOException e) { - JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e); - ex.setLinkedException(e); - throw ex; - } - this.readOnlyMessage = null; - this.readMode = true; - } - - public BytesMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties, - String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException { - super(session, message, properties, sourceTopicName, subscriberId, ackRunnable); - - this.readOnlyMessage = new ReadOnlyMessage(message.getBody().toByteArray()); - this.writeOnlyMessage = null; - this.readMode = true; - } - - @Override - protected MessageUtil.SupportedMessageTypes getJmsMessageType() { - return MessageUtil.SupportedMessageTypes.BYTES; - } - - protected boolean isBodyEmpty(){ - return false; - } - - @Override - public PubSubProtocol.Message generateHedwigMessage() throws JMSException { - PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder(); - super.populateBuilderWithHeaders(builder); - - // Now set body and type. - try { - builder.setBody(ByteString.copyFrom(getPayloadData())); - } catch (IOException e) { - JMSException ex = new JMSException("Unable to read message data .. " + e); - ex.setLinkedException(e); - throw ex; - } - - return builder.build(); - } - - @Override - public long getBodyLength() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - return readOnlyMessage.getBodyLength(); - } - - @Override - public boolean readBoolean() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readBoolean(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public byte readByte() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readByte(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public int readUnsignedByte() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readUnsignedByte(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public short readShort() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readShort(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public int readUnsignedShort() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readUnsignedShort(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public char readChar() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readChar(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public int readInt() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readInt(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public long readLong() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readLong(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public float readFloat() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readFloat(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public double readDouble() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readDouble(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public String readUTF() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readUTF(); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public int readBytes(byte[] data) throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readBytes(data); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public int readBytes(byte[] data, int length) throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readBytes(data, length); - } catch (IOException eof){ - MessageEOFException eofEx = new MessageEOFException("eof ?"); - eofEx.setLinkedException(eof); - throw eofEx; - } - } - - @Override - public void writeBoolean(boolean val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeBoolean(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeByte(byte val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeByte(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeShort(short val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeShort(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeChar(char val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeChar(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeInt(int val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeInt(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeLong(long val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeLong(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeFloat(float val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeFloat(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeDouble(double val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeDouble(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeUTF(String val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeUTF(val); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeBytes(byte[] data) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeBytes(data); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeBytes(byte[] data, int offset, int length) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeBytes(data, offset, length); - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - // This method is ONLY supposed to be used for object form of primitive types ! - @Override - public void writeObject(Object obj) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - // unrolling it - if (obj instanceof Boolean) { - writeOnlyMessage.writeBoolean((Boolean) obj); - } - else if (obj instanceof Byte) { - writeOnlyMessage.writeByte((Byte) obj); - } - else if (obj instanceof Short) { - writeOnlyMessage.writeShort((Short) obj); - } - else if (obj instanceof Character) { - writeOnlyMessage.writeChar((Character) obj); - } - else if (obj instanceof Integer) { - writeOnlyMessage.writeInt((Integer) obj); - } - else if (obj instanceof Long) { - writeOnlyMessage.writeLong((Long) obj); - } - else if (obj instanceof Float) { - writeOnlyMessage.writeFloat((Float) obj); - } - else if (obj instanceof Double) { - writeOnlyMessage.writeDouble((Double) obj); - } - else if (obj instanceof String) { - writeOnlyMessage.writeUTF((String) obj); - } - else if (obj instanceof byte[]) { - writeOnlyMessage.writeBytes((byte[]) obj); - } - else{ - throw new JMSException("Unsupported type for obj : " + obj.getClass()); - } - } catch (IOException ioEx){ - MessageEOFException eofEx = new MessageEOFException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void reset() throws JMSException { - if (this.readMode) return ; - try { - this.readOnlyMessage = new ReadOnlyMessage(writeOnlyMessage.getPayloadAsBytes()); - } catch (IOException e) { - JMSException ex = new JMSException("Unable to convert write-only message to read-only message .. " + e); - ex.setLinkedException(e); - throw ex; - } - this.readMode = true; - this.writeOnlyMessage = null; - } - - @Override - public void clearBody() throws JMSException { - super.clearBody(); - this.writeOnlyMessage = new WriteOnlyMessage(); - this.readOnlyMessage = null; - this.readMode = false; - } - - private byte[] getPayloadData() throws IOException { - if (readMode) return readOnlyMessage.getDataCopy(); - return writeOnlyMessage.getPayloadAsBytes(); - } - - @Override - BytesMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) - throws JMSException { - return new BytesMessageImpl(session, this, sourceTopicName, subscriberId); - } - - // Using java object's instead of primitives to avoid having to store schema separately. - private static class ReadOnlyMessage { - - private final DataInputStream dis; - private final byte[] data; - - public ReadOnlyMessage(byte[] data) { - this.dis = new DataInputStream(new ByteArrayInputStream(data)); - this.data = data; - } - - public byte[] getDataCopy(){ - byte[] retval = new byte[data.length]; - System.arraycopy(data, 0, retval, 0, retval.length); - return retval; - } - - public int getBodyLength() { - return data.length; - } - - public boolean readBoolean() throws IOException { - return dis.readBoolean(); - } - - public byte readByte() throws IOException { - return dis.readByte(); - } - - public int readUnsignedByte() throws IOException { - return dis.readUnsignedByte(); - } - - public short readShort() throws IOException { - return dis.readShort(); - } - - public int readUnsignedShort() throws IOException { - return dis.readUnsignedShort(); - } - - public char readChar() throws IOException { - return dis.readChar(); - } - - public int readInt() throws IOException { - return dis.readInt(); - } - - public long readLong() throws IOException { - return dis.readLong(); - } - - public float readFloat() throws IOException { - return dis.readFloat(); - } - - public double readDouble() throws IOException { - return dis.readDouble(); - } - - public String readUTF() throws IOException { - return dis.readUTF(); - } - - public int readBytes(byte[] data) throws IOException { - return dis.read(data); - } - - public int readBytes(byte[] data, int length) throws IOException { - if (length < 0 || length > data.length) - throw new IndexOutOfBoundsException("Invalid length specified : " + length + ", data : " + data.length); - return dis.read(data, 0, length); - } - } - - private static class WriteOnlyMessage { - - private final ByteArrayOutputStream baos; - private final DataOutputStream dos; - - public WriteOnlyMessage(){ - baos = new ByteArrayOutputStream(); - dos = new DataOutputStream(baos); - } - - public WriteOnlyMessage(byte[] data) throws IOException { - baos = new ByteArrayOutputStream(); - dos = new DataOutputStream(baos); - dos.write(data); - } - - public byte[] getPayloadAsBytes() throws IOException { - dos.flush(); - return baos.toByteArray(); - } - - public void writeBoolean(boolean val) throws IOException { - dos.writeBoolean(val); - } - - public void writeByte(byte val) throws IOException { - dos.writeByte(val); - } - - public void writeShort(short val) throws IOException { - dos.writeShort(val); - } - - public void writeChar(char val) throws IOException { - dos.writeChar(val); - } - - public void writeInt(int val) throws IOException { - dos.writeInt(val); - } - - public void writeLong(long val) throws IOException { - dos.writeLong(val); - } - - public void writeFloat(float val) throws IOException { - dos.writeFloat(val); - } - - public void writeDouble(double val) throws IOException { - dos.writeDouble(val); - } - - public void writeUTF(String val) throws IOException { - dos.writeUTF(val); - } - - public void writeBytes(byte[] data) throws IOException { - dos.write(data); - } - - public void writeBytes(byte[] data, int offset, int length) throws IOException { - dos.write(data, offset, length); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MapMessageImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MapMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MapMessageImpl.java deleted file mode 100644 index af806fb..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MapMessageImpl.java +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.message; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.protocol.PubSubProtocol; - -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageNotWriteableException; -import java.io.IOException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedHashMap; -import java.util.Map; - -/** - * There is a weaker expectation of ordering and strong expectation of <key, value> container for data. - */ -public class MapMessageImpl extends MessageImpl implements MapMessage { - private final Map<String, Object> payload = new LinkedHashMap<String, Object>(4); - private boolean readMode; - - public MapMessageImpl(SessionImpl session) throws JMSException { - super(session); - clearBody(); - } - - public MapMessageImpl(SessionImpl session, MapMessageImpl message, String sourceTopicName, - String subscriberId) throws JMSException { - super(session, (MessageImpl) message, sourceTopicName, subscriberId); - this.payload.putAll(message.payload); - this.readMode = message.readMode; - } - - // To clone a message from a MapMessage which is NOT MapMessageImpl - // Changing order of parameter to NOT accidentally clash with the constructor above. - // This is midly confusing, but helps a lot in preventing accidental bugs ! - public MapMessageImpl(MapMessage message, SessionImpl session) throws JMSException { - super((Message) message, session); - - if (message instanceof MapMessageImpl) { - throw new JMSException("Coding bug - should use this constructor ONLY for non MapMessageImpl messages"); - } - - - Enumeration keys = message.getMapNames(); - while (keys.hasMoreElements()){ - Object key = keys.nextElement(); - if (!(key instanceof String)) - throw new JMSException("Unsupported type (expected String) for key : " + key); - - String skey = (String) key; - this.payload.put(skey, message.getObject(skey)); - } - this.readMode = false; - } - - @SuppressWarnings("unchecked") - public MapMessageImpl(SessionImpl session, PubSubProtocol.Message message, - Map<String, Object> properties, String sourceTopicName, String subscriberId, - Runnable ackRunnable) throws JMSException { - super(session, message, properties, sourceTopicName, subscriberId, ackRunnable); - try { - this.payload.putAll((Map<String, Object>) MessageUtil.bytesToObject(message.getBody().toByteArray())); - } catch (IOException e) { - JMSException ex = new JMSException("Unable to read message data .. " + e); - ex.setLinkedException(e); - throw ex; - } - this.readMode = true; - } - - @Override - protected MessageUtil.SupportedMessageTypes getJmsMessageType() { - return MessageUtil.SupportedMessageTypes.MAP; - } - - protected boolean isBodyEmpty(){ - return false; - } - - @Override - public PubSubProtocol.Message generateHedwigMessage() throws JMSException { - PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder(); - super.populateBuilderWithHeaders(builder); - - // Now set body and type. - try { - builder.setBody(ByteString.copyFrom(MessageUtil.objectToBytes(this.payload))); - } catch (IOException e) { - JMSException ex = new JMSException("Unable to read message data .. " + e); - ex.setLinkedException(e); - throw ex; - } - - return builder.build(); - } - - @Override - public boolean getBoolean(String name) throws JMSException { - return MessageUtil.asBoolean(payload.get(name)); - } - - @Override - public byte getByte(String name) throws JMSException { - return MessageUtil.asByte(payload.get(name)); - } - - @Override - public short getShort(String name) throws JMSException { - return MessageUtil.asShort(payload.get(name)); - } - - @Override - public char getChar(String name) throws JMSException { - return MessageUtil.asChar(payload.get(name)); - } - - @Override - public int getInt(String name) throws JMSException { - return MessageUtil.asInteger(payload.get(name)); - } - - @Override - public long getLong(String name) throws JMSException { - return MessageUtil.asLong(payload.get(name)); - } - - @Override - public float getFloat(String name) throws JMSException { - return MessageUtil.asFloat(payload.get(name)); - } - - @Override - public double getDouble(String name) throws JMSException { - return MessageUtil.asDouble(payload.get(name)); - } - - @Override - public String getString(String name) throws JMSException { - return MessageUtil.asString(payload.get(name)); - } - - @Override - public byte[] getBytes(String name) throws JMSException { - return MessageUtil.asBytes(payload.get(name)); - } - - @Override - public Object getObject(String name) throws JMSException { - return payload.get(name); - } - - @Override - public Enumeration getMapNames() throws JMSException { - return Collections.enumeration(payload.keySet()); - } - - @Override - public void setBoolean(String name, boolean value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - } - - @Override - public void setByte(String name, byte value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setShort(String name, short value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setChar(String name, char value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setInt(String name, int value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setLong(String name, long value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setFloat(String name, float value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setDouble(String name, double value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setString(String name, String value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setBytes(String name, byte[] value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setBytes(String name, byte[] value, int i, int i1) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public void setObject(String name, Object value) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - if (!MessageUtil.isValidKey(name)) throw new IllegalArgumentException("Invalid key " + name); - payload.put(name, value); - - } - - @Override - public boolean itemExists(String name) throws JMSException { - return payload.containsKey(name); - } - - @Override - public void clearBody() throws JMSException { - super.clearBody(); - // allow read and write. - this.payload.clear(); - this.readMode = false; - } - - @Override - public void reset() throws JMSException { - if (this.readMode) return ; - this.readMode = true; - } - - @Override - MapMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) throws JMSException { - return new MapMessageImpl(session, this, sourceTopicName, subscriberId); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageImpl.java deleted file mode 100644 index d13feb0..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageImpl.java +++ /dev/null @@ -1,872 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.message; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.message.header.MetadataProcessor; -import org.apache.hedwig.jms.selector.SelectorEvaluationException; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotWriteableException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -/** - * Implementation of a message. - */ -public class MessageImpl implements Message { - - // This is of type byte for now - enough ? - public static final String JMS_MESSAGE_TYPE_KEY = "jmsBodyType"; - // 'others' (non-jms clients) can depend on this boolean metadata property : for now, part - // of jms values directly due to how metadata is being designed ! - // sigh :-( - public static final String EMPTY_BODY_KEY = "bodyEmpty"; - - - private final static Logger logger = LoggerFactory.getLogger(MessageImpl.class); - - public static final String JMS_MESSAGE_ID = "JMSMessageID"; - public static final String JMS_TIMESTAMP = "JMSTimestamp"; - public static final String JMS_CORRELATION_ID = "JMSCorrelationID"; - public static final String JMS_REPLY_TO = "JMSReplyTo"; - public static final String JMS_DESTINATION = "JMSDestination"; - public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode"; - public static final String JMS_REDELIVERED = "JMSRedelivered"; - public static final String JMS_TYPE = "JMSType"; - public static final String JMS_EXPIRATION = "JMSExpiration"; - public static final String JMS_PRIORITY = "JMSPriority"; - - private static final Set<String> standardProperties; - static { - Set<String> set = new HashSet<String>(16); - set.add(JMS_MESSAGE_ID); - set.add(JMS_TIMESTAMP); - set.add(JMS_CORRELATION_ID); - - set.add(JMS_REPLY_TO); - set.add(JMS_DESTINATION); - set.add(JMS_DELIVERY_MODE); - - // Currently simulated in provider - NOT from hedwig. - set.add(JMS_REDELIVERED); - set.add(JMS_TYPE); - - set.add(JMS_EXPIRATION); - set.add(JMS_PRIORITY); - - standardProperties = Collections.unmodifiableSet(set); - } - - private final SessionImpl session; - private final String serverJmsMessageId; - - private String jmsMessageId; - private long jmsTimestamp = 0; - private String jmsCorrelationID; - - private Destination jmsReplyTo; - private Destination jmsDestination; - private int jmsDeliveryMode = DeliveryMode.PERSISTENT; - - private boolean jmsRedelivered; - private String jmsType; - - private long jmsExpiration = 0L; - private int jmsPriority = Message.DEFAULT_PRIORITY; - - // Note: this DOES NOT contain standard headers - which are explicitly handled. - private boolean propertiesReadOnly = false; - protected Map<String, Object> properties = new HashMap<String, Object>(4); - - // key == standard property. - private Set<String> standardPropertiesExists = new HashSet<String>(16); - private Set<String> standardPropertiesExistsForWire = new HashSet<String>(16); - - private final String sourceName; - private final String subscriberId; - - private final Runnable ackRunnable; - - // This is to be set to true ONLY for testing - NOT otherwise ! - // The JMS api DOES NOT expose this ... - // private boolean allowSpecifyJMSMessageIDForTest; - - //private final PubSubProtocol.Message rawMessage; - - public MessageImpl(SessionImpl session){ - this.session = session; - - this.sourceName = null; - this.subscriberId = null; - this.ackRunnable = null; - this.serverJmsMessageId = null; - // this.rawMessage = null; - } - - MessageImpl(SessionImpl session, MessageImpl message, String sourceName, String subscriberId) - throws JMSException { - this.session = session; - this.sourceName = sourceName; - this.subscriberId = subscriberId; - this.ackRunnable = message.getAckRunnable(); - this.serverJmsMessageId = message.getServerJmsMessageId(); - // this.rawMessage = null; - - // Copy all properties from message to this class. - - this.properties.putAll(message.properties); - - // Now copy rest of the state over ... - if (message.propertyExists(JMS_MESSAGE_ID)) setJMSMessageIDInternal(message.getJMSMessageID()); - if (message.propertyExists(JMS_TIMESTAMP)) setJMSTimestamp(message.getJMSTimestamp()); - if (message.propertyExists(JMS_CORRELATION_ID)) setJMSCorrelationID(message.getJMSCorrelationID()); - // We do not support this right now. - // if (message.propertyExists(JMS_CORRELATION_ID_AS_BYTES)) - // setJMSCorrelationIDAsBytes(message.getJMSCorrelationIDAsBytes()); - if (message.propertyExists(JMS_REPLY_TO)) setJMSReplyTo(message.getJMSReplyTo()); - if (message.propertyExists(JMS_DESTINATION)) setJMSDestination(message.getJMSDestination()); - if (message.propertyExists(JMS_DELIVERY_MODE)) setJMSDeliveryMode(message.getJMSDeliveryMode()); - if (message.propertyExists(JMS_REDELIVERED)) setJMSRedelivered(message.getJMSRedelivered()); - if (message.propertyExists(JMS_TYPE)) setJMSType(message.getJMSType()); - if (message.propertyExists(JMS_EXPIRATION)) setJMSExpiration(message.getJMSExpiration()); - if (message.propertyExists(JMS_PRIORITY)) setJMSPriority(message.getJMSPriority()); - - this.propertiesReadOnly = message.propertiesReadOnly; - } - - // To clone a message from a Message which is NOT MessageImpl - // Changing order of parameter to NOT accidentally clash with the constructor above. - // This is midly confusing, but helps a lot in preventing accidental bugs ! - MessageImpl(Message message, SessionImpl session) throws JMSException { - this.session = session; - this.sourceName = null; - this.subscriberId = null; - this.ackRunnable = null; - this.serverJmsMessageId = null; - // this.rawMessage = null; - - assert (! (message instanceof MessageImpl )); - - // Copy all properties from message to this class. - Enumeration names = message.getPropertyNames(); - while (names.hasMoreElements()){ - Object name = names.nextElement(); - if (!(name instanceof String)) - throw new JMSException("Unsupported type (expected String) for key : " + name); - - String sname = (String) name; - this.properties.put(sname, message.getObjectProperty(sname)); - } - - // Now copy rest of the state over ... - // JMS VIOLATION: we will be unable to check for propertyExists after this, - // at sender and receiver side ... sigh :-( - setJMSMessageIDInternal(message.getJMSMessageID()); - setJMSTimestamp(message.getJMSTimestamp()); - setJMSCorrelationID(message.getJMSCorrelationID()); - // We do not support this right now. - // setJMSCorrelationIDAsBytes(message.getJMSCorrelationIDAsBytes()); - setJMSReplyTo(message.getJMSReplyTo()); - setJMSDestination(message.getJMSDestination()); - setJMSDeliveryMode(message.getJMSDeliveryMode()); - setJMSRedelivered(message.getJMSRedelivered()); - setJMSType(message.getJMSType()); - setJMSExpiration(message.getJMSExpiration()); - setJMSPriority(message.getJMSPriority()); - - // Should be able to modify, right ? - this.propertiesReadOnly = false; - - // remove all jms standard keys from properties now : this should ideally result in zero - // removals ... but we never know with client code ! - for (String key : standardProperties) properties.remove(key); - } - - MessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties, - String sourceName, String subscriberId, Runnable ackRunnable) throws JMSException { - this.session = session; - this.sourceName = sourceName; - this.subscriberId = subscriberId; - this.ackRunnable = ackRunnable; - // this.rawMessage = message; - - // setJMSMessageID(getStringProperty(properties, JMS_MESSAGE_ID)); - setJMSMessageIDInternal(MessageUtil.generateJMSMessageIdFromSeqId(message.getMsgId())); - this.serverJmsMessageId = getJMSMessageID(); - - if (properties.containsKey(JMS_TIMESTAMP)) setJMSTimestamp(getLongProperty(properties, JMS_TIMESTAMP)); - if (properties.containsKey(JMS_CORRELATION_ID)) setJMSCorrelationID( - getStringProperty(properties, JMS_CORRELATION_ID)); - if (null != getStringProperty(properties, JMS_REPLY_TO)) { - setJMSReplyTo( - session.getDestination(session.findDestinationType(getStringProperty(properties, JMS_REPLY_TO)), - getStringProperty(properties, JMS_REPLY_TO) - )); - } - if (null != getStringProperty(properties, JMS_DESTINATION)) { - setJMSDestination( - session.getDestination(session.findDestinationType( - getStringProperty(properties, JMS_DESTINATION)), - getStringProperty(properties, JMS_DESTINATION) - )); - } - - if (properties.containsKey(JMS_DELIVERY_MODE)) setJMSDeliveryMode( - getIntProperty(properties, JMS_DELIVERY_MODE)); - if (properties.containsKey(JMS_TYPE)) setJMSType(getStringProperty(properties, JMS_TYPE)); - - if (properties.containsKey(JMS_EXPIRATION)) setJMSExpiration( - getLongProperty(properties, JMS_EXPIRATION)); - if (properties.containsKey(JMS_PRIORITY)) setJMSPriority( - getIntProperty(properties, JMS_PRIORITY)); - - - // remove all jms standard keys from properties now : this should result in zero removals ... - // but adding anyway. - for (String key : standardProperties) properties.remove(key); - - // Immutable after reading from stream ! - this.propertiesReadOnly = true; - this.properties.putAll(properties); - } - - protected MessageUtil.SupportedMessageTypes getJmsMessageType(){ - // Validate against coding bug ... this MUST be overridden in all subclasses. - if (getClass() != MessageImpl.class) - throw new IllegalStateException("This method must be overridden by subclasses. class : " + getClass()); - return MessageUtil.SupportedMessageTypes.ONLY_MESSAGE; - } - - public PubSubProtocol.Message generateHedwigMessage() throws JMSException { - // This is to be called ONLY from the base class - all children MUST override it and NOT delegate to it. - if (getClass() != MessageImpl.class) { - throw new JMSException("Unexpected to call MessageImpl's generateHedwigMessage from subclass " + - getClass()); - } - - PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder(); - populateBuilderWithHeaders(builder); - // no body - will be appropriately set in populateBuilderWithHeaders(). - return builder.build(); - } - - protected boolean isBodyEmpty(){ - return true; - } - - /* - protected void markEmptyBody(PubSubProtocol.Message.Builder builder) { - MetadataProcessor.addBooleanProperty(builder, EMPTY_BODY_KEY, true); - builder.setBody(ByteString.EMPTY); - } - */ - - protected boolean hasBodyFromProperties() { - // if key missing (common case), then there is body. - if (!properties.containsKey(EMPTY_BODY_KEY)) return true; - // If present, then check if it is a boolean of value true. - Object value = properties.get(EMPTY_BODY_KEY); - - // special case null. - if (null == value) return true; - if (value instanceof Boolean) return ! (Boolean) value; - - // unknown type ... - logger.info("Unknown type for value of " + EMPTY_BODY_KEY + " in message properties : " + value); - // assume true by default. - return true; - } - - - protected final void populateBuilderWithHeaders(PubSubProtocol.Message.Builder builder) throws JMSException { - - Map<String, Object> propertiesCopy = new HashMap<String, Object>(properties); - if (isBodyEmpty()) { - propertiesCopy.put(EMPTY_BODY_KEY, true); - builder.setBody(ByteString.EMPTY); - } - // Not setting unless required to reduce message size - change this ? - // else propertiesCopy.put(EMPTY_BODY_KEY, false); - - Iterator<Map.Entry<String, Object>> iter = propertiesCopy.entrySet().iterator(); - while (iter.hasNext()){ - Map.Entry<String, Object> entry = iter.next(); - if (standardProperties.contains(entry.getKey())) { - if (logger.isInfoEnabled()) - logger.info("Ignoring user attempt to set standard property as application property : " + entry); - iter.remove(); - } - } - - - // set jms message type. - propertiesCopy.put(JMS_MESSAGE_TYPE_KEY, getJmsMessageType().getType()); - if (standardPropertiesExistsForWire.contains(JMS_CORRELATION_ID)) - propertiesCopy.put(JMS_CORRELATION_ID, getJMSCorrelationID()); - - // unsupported for now. - // if (standardPropertiesExistsForWire.contains(JMS_CORRELATION_ID_AS_BYTES)) - // propertiesCopy.put(JMS_CORRELATION_ID_AS_BYTES, getJMSCorrelationIDAsBytes()); - if (standardPropertiesExistsForWire.contains(JMS_DELIVERY_MODE)) - propertiesCopy.put(JMS_DELIVERY_MODE, getJMSDeliveryMode()); - - if (standardPropertiesExistsForWire.contains(JMS_DESTINATION)) - propertiesCopy.put(JMS_DESTINATION, session.toName(getJMSDestination())); - if (standardPropertiesExistsForWire.contains(JMS_EXPIRATION)) - propertiesCopy.put(JMS_EXPIRATION, getJMSExpiration()); - - // This can be set by client - but we ignore it in hedwig. - // if (standardPropertiesExistsForWire.contains(JMS_MESSAGE_ID)) - // propertiesCopy.put(JMS_MESSAGE_ID, getJMSMessageID()); - - // We do not support priority - but we are gong to allow it to be specified : this is - // for selectors to set conditions on it ! - if (standardPropertiesExistsForWire.contains(JMS_PRIORITY)) - propertiesCopy.put(JMS_PRIORITY, getJMSPriority()); - - // this is not to be sent to hedwig. - // if (standardPropertiesExistsForWire.contains(JMS_REDELIVERED)) - // propertiesCopy.put(JMS_REDELIVERED, getJMSRedelivered()); - - if (standardPropertiesExistsForWire.contains(JMS_REPLY_TO)) - propertiesCopy.put(JMS_REPLY_TO, session.toName(getJMSReplyTo())); - - - propertiesCopy.put(JMS_TIMESTAMP, getJMSTimestamp()); - if (standardPropertiesExistsForWire.contains(JMS_TYPE)) propertiesCopy.put(JMS_TYPE, getJMSType()); - - - MetadataProcessor.addHeaders(builder, propertiesCopy); - } - - @Override - public String getJMSMessageID() { - return jmsMessageId; - } - - @Override - public void setJMSMessageID(String jmsMessageId) throws JMSException { - // JMS VIOLATION ... we are NOT allowing client to override jms message-id. - // if (!allowSpecifyJMSMessageIDForTest) - // throw new JMSException("We do not allow setting jms message id. This will be ignored by hedwig anyway."); - if (logger.isDebugEnabled()) logger.debug("Setting this is irrelevant - we override it anyway - " + - " hedwig does not allow specifying it explictly."); - setJMSMessageIDInternal(jmsMessageId); - } - - public void setJMSMessageIDInternal(String jmsMessageId) throws JMSException { - this.jmsMessageId = jmsMessageId; - if (null != jmsMessageId){ - // We do not allow sending the property over wire. - this.standardPropertiesExists.add(JMS_MESSAGE_ID); - // this.standardPropertiesExistsForWire.add(JMS_MESSAGE_ID); - } - else { - this.standardPropertiesExists.remove(JMS_MESSAGE_ID); - // this.standardPropertiesExistsForWire.remove(JMS_MESSAGE_ID); - } - } - - // The immutable message Id set by the server. - public String getServerJmsMessageId() { - return serverJmsMessageId; - } - - @Override - public long getJMSTimestamp() { - return jmsTimestamp; - } - - @Override - public void setJMSTimestamp(long jmsTimestamp) { - this.jmsTimestamp = jmsTimestamp; - this.standardPropertiesExists.add(JMS_TIMESTAMP); - // this.standardPropertiesExistsForWire.add(JMS_TIMESTAMP); - } - - @Override - public byte[] getJMSCorrelationIDAsBytes() { - throw new UnsupportedOperationException("unsupported"); - } - - @Override - public void setJMSCorrelationIDAsBytes(byte[] bytes) { - throw new UnsupportedOperationException("unsupported"); - } - - @Override - public void setJMSCorrelationID(String jmsCorrelationID) { - this.jmsCorrelationID = jmsCorrelationID; - if (null != jmsCorrelationID){ - this.standardPropertiesExists.add(JMS_CORRELATION_ID); - this.standardPropertiesExistsForWire.add(JMS_CORRELATION_ID); - } - else { - this.standardPropertiesExists.remove(JMS_CORRELATION_ID); - this.standardPropertiesExistsForWire.remove(JMS_CORRELATION_ID); - } - } - - @Override - public String getJMSCorrelationID() { - return jmsCorrelationID; - } - - @Override - public Destination getJMSReplyTo() { - return jmsReplyTo; - } - - @Override - public void setJMSReplyTo(Destination jmsReplyTo) { - this.jmsReplyTo = jmsReplyTo; - if (null != jmsReplyTo){ - this.standardPropertiesExists.add(JMS_REPLY_TO); - this.standardPropertiesExistsForWire.add(JMS_REPLY_TO); - } - else { - this.standardPropertiesExists.remove(JMS_REPLY_TO); - this.standardPropertiesExistsForWire.remove(JMS_REPLY_TO); - } - } - - @Override - public Destination getJMSDestination() { - return jmsDestination; - } - - @Override - public void setJMSDestination(Destination jmsDestination) { - this.jmsDestination = jmsDestination; - if (null != jmsDestination){ - this.standardPropertiesExists.add(JMS_DESTINATION); - this.standardPropertiesExistsForWire.add(JMS_DESTINATION); - } - else { - this.standardPropertiesExists.remove(JMS_DESTINATION); - this.standardPropertiesExistsForWire.remove(JMS_DESTINATION); - } - } - - @Override - public int getJMSDeliveryMode() { - return jmsDeliveryMode; - } - - @Override - public void setJMSDeliveryMode(int jmsDeliveryMode) { - this.jmsDeliveryMode = jmsDeliveryMode; - this.standardPropertiesExists.add(JMS_DELIVERY_MODE); - this.standardPropertiesExistsForWire.add(JMS_DELIVERY_MODE); - } - - @Override - public boolean getJMSRedelivered() { - return jmsRedelivered; - } - - @Override - public void setJMSRedelivered(boolean jmsRedelivered) { - this.jmsRedelivered = jmsRedelivered; - this.standardPropertiesExists.add(JMS_REDELIVERED); - // this.standardPropertiesExistsForWire.add(JMS_REDELIVERED); - } - - @Override - public String getJMSType() { - return jmsType; - } - - @Override - public void setJMSType(String jmsType) { - this.jmsType = jmsType; - if (null != jmsType){ - this.standardPropertiesExists.add(JMS_TYPE); - this.standardPropertiesExistsForWire.add(JMS_TYPE); - } - else { - this.standardPropertiesExists.remove(JMS_TYPE); - this.standardPropertiesExistsForWire.remove(JMS_TYPE); - } - } - - @Override - public long getJMSExpiration() { - return jmsExpiration; - } - - @Override - public void setJMSExpiration(long jmsExpiration) { - // We simulate it now ! - // if (logger.isInfoEnabled()) logger.info("JMSExpiration is not supported right now by Hedwig ..."); - this.jmsExpiration = jmsExpiration; - - if (0 != jmsExpiration){ - this.standardPropertiesExists.add(JMS_EXPIRATION); - this.standardPropertiesExistsForWire.add(JMS_EXPIRATION); - } - else { - this.standardPropertiesExists.remove(JMS_EXPIRATION); - this.standardPropertiesExistsForWire.remove(JMS_EXPIRATION); - } - } - - @Override - public int getJMSPriority() { - return jmsPriority; - } - - @Override - public void setJMSPriority(int jmsPriority) { - this.jmsPriority = jmsPriority; - this.standardPropertiesExists.add(JMS_PRIORITY); - // Sent over wire ? - this.standardPropertiesExistsForWire.add(JMS_PRIORITY); - } - - @Override - public void clearProperties() { - this.propertiesReadOnly = false; - properties.clear(); - } - - /** - * JMS VIOLATION ? The spec & javadoc is unclear as to whether this method must include jms - * standard properties or not. - * But going by javadoc of - * @see #getPropertyNames() , we have this specified : - * "Note that JMS standard header fields are not considered properties and are not returned - * in this enumeration." - * Which indicates this method must not include standard properties. - */ - @Override - public boolean propertyExists(String key) { - if (!standardProperties.contains(key)) return properties.containsKey(key); - - // Evaluate depending on type of property. - return standardPropertiesExists.contains(key); - } - - @Override - public boolean getBooleanProperty(String key) throws JMSException { - checkIfStandardProperty(key); - return getBooleanProperty(properties, key); - } - - private boolean getBooleanProperty(Map<String, Object> properties, String key) throws JMSException { - return MessageUtil.asBoolean(properties.get(key)); - } - - @Override - public byte getByteProperty(String key) throws JMSException { - checkIfStandardProperty(key); - return getByteProperty(properties, key); - } - - private byte getByteProperty(Map<String, Object> properties, String key) throws JMSException { - return MessageUtil.asByte(properties.get(key)); - } - - @Override - public short getShortProperty(String key) throws JMSException { - checkIfStandardProperty(key); - return getShortProperty(properties, key); - } - - private short getShortProperty(Map<String, Object> properties, String key) throws JMSException { - return MessageUtil.asShort(properties.get(key)); - } - - @Override - public int getIntProperty(String key) throws JMSException { - checkIfStandardProperty(key); - return getIntProperty(properties, key); - } - - private int getIntProperty(Map<String, Object> properties, String key) throws JMSException { - return MessageUtil.asInteger(properties.get(key)); - } - - @Override - public long getLongProperty(String key) throws JMSException { - checkIfStandardProperty(key); - return getLongProperty(properties, key); - } - - private long getLongProperty(Map<String, Object> properties, String key) throws JMSException { - return MessageUtil.asLong(properties.get(key)); - } - - @Override - public float getFloatProperty(String key) throws JMSException { - checkIfStandardProperty(key); - return getFloatProperty(properties, key); - } - - private float getFloatProperty(Map<String, Object> properties, String key) throws JMSException { - return MessageUtil.asFloat(properties.get(key)); - } - - @Override - public double getDoubleProperty(String key) throws JMSException { - checkIfStandardProperty(key); - return getDoubleProperty(properties, key); - } - - private double getDoubleProperty(Map<String, Object> properties, String key) throws JMSException { - return MessageUtil.asDouble(properties.get(key)); - } - - public Object getSelectorProcessingPropertyValue(String key) throws SelectorEvaluationException { - if (properties.containsKey(key)) return properties.get(key); - if (! standardProperties.contains(key)) return null; - - if (JMS_MESSAGE_ID.equals(key)) return getJMSMessageID(); - if (JMS_TIMESTAMP.equals(key)) return getJMSTimestamp(); - if (JMS_CORRELATION_ID.equals(key)) return getJMSCorrelationID(); - // We do not support this right now. - // if (JMS_CORRELATION_ID_AS_BYTES.equals(key)) return getJMSCorrelationIDAsBytes(); - if (JMS_REPLY_TO.equals(key)) return getJMSReplyTo(); - if (JMS_DESTINATION.equals(key)) return getJMSDestination(); - if (JMS_DELIVERY_MODE.equals(key)) { - // 3.8.1.3 Special Notes "When used in a message selector JMSDeliveryMode is treated as having the - // values âPERSISTENTâ and âNON_PERSISTENTâ." - final int deliveryMode = getJMSDeliveryMode(); - if (DeliveryMode.PERSISTENT == deliveryMode) return "PERSISTENT"; - if (DeliveryMode.NON_PERSISTENT == deliveryMode) return "NON_PERSISTENT"; - // unknown ! - if (logger.isInfoEnabled()) logger.info("Unknown delivery mode specified ... " + deliveryMode); - return null; - } - if (JMS_REDELIVERED.equals(key)) return getJMSRedelivered(); - if (JMS_TYPE.equals(key)) return getJMSType(); - if (JMS_EXPIRATION.equals(key)) return getJMSExpiration(); - if (JMS_PRIORITY.equals(key)) return getJMSPriority(); - - throw new SelectorEvaluationException("Unable to retrieve value for key : '" + key + "'"); - } - - @Override - public String getStringProperty(String key) throws JMSException { - checkIfStandardProperty(key); - return getStringProperty(properties, key); - } - - private String getStringProperty(Map<String, Object> properties, String key) throws JMSException { - return MessageUtil.asString(properties.get(key)); - } - - @Override - public Object getObjectProperty(String key) throws JMSException { - checkIfStandardProperty(key); - // if (!propertyExists(key)) return null; - - return properties.get(key); - } - - @Override - public Enumeration<String> getPropertyNames() throws JMSException { - return Collections.enumeration(properties.keySet()); - } - - @Override - public void setBooleanProperty(String key, boolean value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - properties.put(key, value); - } - - @Override - public void setByteProperty(String key, byte value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - properties.put(key, value); - } - - @Override - public void setShortProperty(String key, short value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - properties.put(key, value); - } - - @Override - public void setIntProperty(String key, int value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - properties.put(key, value); - } - - @Override - public void setLongProperty(String key, long value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - properties.put(key, value); - } - - @Override - public void setFloatProperty(String key, float value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - properties.put(key, value); - } - - @Override - public void setDoubleProperty(String key, double value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - properties.put(key, value); - } - - @Override - public void setStringProperty(String key, String value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - properties.put(key, value); - } - - @Override - public void setObjectProperty(String key, Object value) throws JMSException { - if (!MessageUtil.isValidKey(key)) throw new IllegalArgumentException("Invalid key " + key); - if (propertiesReadOnly) - throw new MessageNotWriteableException("Message not writable. attempt to set property " + - key + " = " + value); - checkIfStandardProperty(key); - - if (null == value || - value instanceof Boolean || - value instanceof Byte || - value instanceof Short || - value instanceof Integer || - value instanceof Long || - value instanceof Float || - value instanceof Double || - value instanceof byte[] || - value instanceof String) { - properties.put(key, value); - return ; } - - throw new MessageFormatException("Unsupported type for value " + value.getClass()); - } - - // JMS VIOLATION ? - // I am not sure if getting and setting standard properties is allowed via the generic - // get/set methods : the spec seems unclear on it. - // Some javadocs seem to indicate it is NOT allowed. Hence this check ... - // If it is allowed in JMS - to support it, we will need to have a if/else block within each set/get - // which delegates to corresponding jms header set/get ... - private void checkIfStandardProperty(String key) throws JMSException { - if (standardProperties.contains(key)) - throw new JMSException("Cannot get/set standard JMS properties using *Property api"); - } - - @Override - public void acknowledge() throws JMSException { - session.acknowledge(this); - } - - @Override - public void clearBody() throws JMSException { - // Clear the body of the message. - } - - public String getSourceName() { - return sourceName; - } - - public String getSubscriberId() { - return subscriberId; - } - - MessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) throws JMSException { - if (MessageImpl.class != getClass()) { - throw new JMSException("Unexpected to call MessageImpl's createClone from subclass " + getClass()); - } - return new MessageImpl(session, this, sourceTopicName, subscriberId); - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("MessageImpl"); - sb.append("{session=").append(session); - sb.append(", jmsMessageId='").append(jmsMessageId).append('\''); - sb.append(", jmsTimestamp=").append(jmsTimestamp); - sb.append(", jmsCorrelationID='").append(jmsCorrelationID).append('\''); - sb.append(", jmsReplyTo=").append(jmsReplyTo); - sb.append(", jmsDestination=").append(jmsDestination); - sb.append(", jmsDeliveryMode=").append(jmsDeliveryMode); - sb.append(", jmsRedelivered=").append(jmsRedelivered); - sb.append(", jmsType='").append(jmsType).append('\''); - sb.append(", jmsExpiration=").append(jmsExpiration); - sb.append(", jmsPriority=").append(jmsPriority); - sb.append(", properties=").append(properties); - sb.append(", standardPropertiesExists=").append(standardPropertiesExists); - sb.append(", standardPropertiesExistsForWire=").append(standardPropertiesExistsForWire); - sb.append(", sourceName='").append(sourceName).append('\''); - sb.append(", subscriberId='").append(subscriberId).append('\''); - sb.append('}'); - return sb.toString(); - } - - void reset() throws JMSException { - // noop ... children will override to do needful. - } - - public Runnable getAckRunnable() { - return ackRunnable; - } -}
