http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java deleted file mode 100644 index 15b75ee..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/MessageUtil.java +++ /dev/null @@ -1,444 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.message; - - -import com.google.protobuf.ByteString; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.message.header.MetadataProcessor; -import org.apache.hedwig.protocol.PubSubProtocol; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageFormatException; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -/** - * Bunch of simple util methods to reduce code in the implementation. - */ -public class MessageUtil { - - // The various message types supported. - public enum SupportedMessageTypes { - ONLY_MESSAGE((byte) 0), - TEXT((byte) 1), - BYTES((byte) 2), - MAP((byte) 3), - STREAM((byte) 4), - OBJECT((byte) 5); - - private final byte type; - private SupportedMessageTypes(byte type){ - this.type = type; - } - - public byte getType() { - return type; - } - } - - private static final Map<Byte, SupportedMessageTypes> valueToSupportedMessageType; - static { - SupportedMessageTypes[] arr = SupportedMessageTypes.values(); - Map<Byte, SupportedMessageTypes> map = new HashMap<Byte, SupportedMessageTypes>(arr.length); - for (SupportedMessageTypes type : arr){ - map.put(type.getType(), type); - } - valueToSupportedMessageType = Collections.unmodifiableMap(map); - } - - public static boolean asBoolean(Object value) throws MessageFormatException { - // The JMS spec explicitly wants us to raise NPE ! - // if (null == value) return false; - if (null == value) return Boolean.valueOf((String) value); - - if (value instanceof Boolean) return (Boolean) value; - if (value instanceof String) return Boolean.valueOf((String) value); - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static byte asByte(Object value) throws MessageFormatException { - // The JMS spec explicitly wants us to raise NPE ! - // if (null == value) return 0; - if (null == value) return Byte.valueOf((String) value); - - if (value instanceof Byte) return (Byte) value; - if (value instanceof String) return Byte.valueOf((String) value); - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static short asShort(Object value) throws MessageFormatException { - // The JMS spec explicitly wants us to raise NPE ! - // if (null == value) return 0; - if (null == value) return Short.valueOf((String) value); - - if (value instanceof Byte) return (Byte) value; - if (value instanceof Short) return (Short) value; - if (value instanceof String) return Short.valueOf((String) value); - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static int asInteger(Object value) throws MessageFormatException { - // The JMS spec explicitly wants us to raise NPE ! - // if (null == value) return 0; - if (null == value) return Integer.valueOf((String) value); - - if (value instanceof Byte) return (Byte) value; - if (value instanceof Short) return (Short) value; - if (value instanceof Integer) return (Integer) value; - if (value instanceof String) return Integer.valueOf((String) value); - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static long asLong(Object value) throws MessageFormatException { - // The JMS spec explicitly wants us to raise NPE ! - // if (null == value) return 0; - if (null == value) return Long.valueOf((String) value); - - if (value instanceof Byte) return (Byte) value; - if (value instanceof Short) return (Short) value; - if (value instanceof Integer) return (Integer) value; - if (value instanceof Long) return (Long) value; - if (value instanceof String) return Long.valueOf((String) value); - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static float asFloat(Object value) throws MessageFormatException { - // The JMS spec explicitly wants us to raise NPE ! - // if (null == value) return 0.0f; - if (null == value) return Float.valueOf((String) value); - - if (value instanceof Float) return (Float) value; - if (value instanceof String) return Float.valueOf((String) value); - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static double asDouble (Object value) throws MessageFormatException { - // The JMS spec explicitly wants us to raise NPE ! - // if (null == value) return 0.0; - if (null == value) return Double.valueOf((String) value); - - if (value instanceof Float) return (Float) value; - if (value instanceof Double ) return (Double) value; - if (value instanceof String) return Double.valueOf((String) value); - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static Double asDoubleSelectorProcessing(Object value) throws MessageFormatException { - if (null == value) return null; - - if (value instanceof Float) return (double) (Float) value; - if (value instanceof Double ) return (Double) value; - - if (value instanceof Long) return (double) (Long) value; - if (value instanceof Integer) return (double) (Integer) value; - if (value instanceof Short) return (double) (Short) value; - if (value instanceof Byte) return (double) (Byte) value; - - return null; - } - - public static Integer asIntegerSelectorProcessing(Object value) throws MessageFormatException { - if (null == value) return null; - - if (value instanceof Float) return (int) (float) (Float) value; - if (value instanceof Double ) return (int) (double) (Double) value; - - if (value instanceof Long) return (int) (long) (Long) value; - if (value instanceof Integer) return (Integer) value; - if (value instanceof Short) return (int) (Short) value; - if (value instanceof Byte) return (int) (Byte) value; - - return null; - } - - public static String asString(Object value) { - if (null == value) return null; - - if (value instanceof String) return (String) value; - // converts from boolean, byte, short, char, int, long, float and double to String. - return "" + value; - } - - public static char asChar(Object value) throws MessageFormatException { - // treat it as integer with null - if (null == value) return (char) 0; - - // only from/to char - if (value instanceof Character) return (Character) value; - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static byte[] asBytes(Object value) throws MessageFormatException { - if (null == value || value instanceof byte[]) return (byte[]) value; - throw new MessageFormatException("Unsupported property type " + value.getClass() + " for " + value); - } - - public static boolean isValidKey(String key) { - return null != key && 0 != key.length(); - } - - public static byte[] objectToBytes(Object obj) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(128); - ObjectOutputStream oos = new ObjectOutputStream(baos); - try { - oos.writeObject(obj); - oos.flush(); - } finally { - try { oos.close(); } catch (IOException ioEx) { /* ignore */ } - } - - return baos.toByteArray(); - } - - public static Object bytesToObject(byte[] data) throws IOException { - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)); - try { - return ois.readObject(); - } catch (ClassNotFoundException cnfEx){ - // unexpected ! - throw new IllegalStateException("Unexpected", cnfEx); - } finally { - try { ois.close(); } catch(IOException ioEx) { /* ignore */ } - } - } - - - - public static MessageImpl processHedwigMessage(SessionImpl session, PubSubProtocol.Message message, - String sourceTopicName, String subscriberId, - Runnable ackRunnable) throws JMSException { - Map<String, Object> map = MetadataProcessor.parseHeaders(message); - - Object jmsBodyTypeValue = map.get(MessageImpl.JMS_MESSAGE_TYPE_KEY); - // Should we treat these as bytes message by default ? - // if (! (jmsBodyTypeValue instanceof Byte) ) - // throw new JMSException("Unsupported message : " + message + ", unable to determine jms message type " + - // jmsBodyTypeValue); - if (! (jmsBodyTypeValue instanceof Byte) ) jmsBodyTypeValue = (Byte) SupportedMessageTypes.BYTES.getType(); - - SupportedMessageTypes type = valueToSupportedMessageType.get((Byte) jmsBodyTypeValue); - switch (type){ - case STREAM: - return new StreamMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable); - case MAP: - return new MapMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable); - case TEXT: - return new TextMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable); - case OBJECT: - return new ObjectMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable); - case BYTES: - return new BytesMessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable); - case ONLY_MESSAGE: - return new MessageImpl(session, message, map, sourceTopicName, subscriberId, ackRunnable); - default: - throw new JMSException("Unsupported message type : " + type + " for message " + message); - } - } - - public static MessageImpl createMessageCopy(SessionImpl session, Message message) throws JMSException { - if (message instanceof MessageImpl) { - return createMessageImplCopy(session, (MessageImpl) message); - } - - if (message instanceof BytesMessage) { - return new BytesMessageImpl((BytesMessage) message, session); - } - if (message instanceof MapMessage) { - return new MapMessageImpl((MapMessage) message, session); - } - if (message instanceof ObjectMessage) { - return new ObjectMessageImpl((ObjectMessage) message, session); - } - if (message instanceof StreamMessage) { - return new StreamMessageImpl((StreamMessage) message, session); - } - if (message instanceof TextMessage) { - return new TextMessageImpl((TextMessage) message, session); - } - - return new MessageImpl(message, session); - } - - private static MessageImpl createMessageImplCopy(SessionImpl session, MessageImpl message) - throws JMSException { - - if (message instanceof BytesMessageImpl) { - return new BytesMessageImpl(session, (BytesMessageImpl) message, message.getSourceName(), - message.getSubscriberId()); - } - if (message instanceof MapMessageImpl) { - return new MapMessageImpl(session, (MapMessageImpl) message, message.getSourceName(), - message.getSubscriberId()); - } - if (message instanceof ObjectMessageImpl) { - return new ObjectMessageImpl(session, (ObjectMessageImpl) message, message.getSourceName(), - message.getSubscriberId()); - } - if (message instanceof StreamMessageImpl) { - return new StreamMessageImpl(session, (StreamMessageImpl) message, message.getSourceName(), - message.getSubscriberId()); - } - if (message instanceof TextMessageImpl) { - return new TextMessageImpl(session, (TextMessageImpl) message, message.getSourceName(), - message.getSubscriberId()); - } - - return new MessageImpl(session, message, message.getSourceName(), message.getSubscriberId()); - } - - private static final String JMS_MESSAGE_ID_PREFIX = "ID:"; - private static final String LOCAL_PREFIX = "LOCAL("; - private static final String REMOTE_PREFIX = "REMOTE("; - private static final char SEQ_ID_SUFFIX = ')'; - private static final char REMOTE_RECORD_SEPARATOR = ','; - private static final char REMOTE_RECORD_SEQ_ID_PREFIX = '['; - private static final char REMOTE_RECORD_SEQ_ID_SUFFIX = ']'; - private static final Pattern remoteMessageIdSplitPattern = Pattern.compile("" + REMOTE_RECORD_SEPARATOR); - - /** - * Based on - * {@link org.apache.hedwig.admin.console.ReadTopic#formatMessage(PubSubProtocol.Message)} - * - * This is tightly coupled with - * @see #generateSeqIdFromJMSMessageId(String) - * - * @param seqId The sequence id to convert to string. - * @return The string representation of the seq-id. - */ - public static String generateJMSMessageIdFromSeqId(final PubSubProtocol.MessageSeqId seqId) { - StringBuilder sb = new StringBuilder(); - // mandatory prefix for system generated id's. - sb.append(JMS_MESSAGE_ID_PREFIX); - - if (seqId.hasLocalComponent()) { - sb.append(LOCAL_PREFIX).append(seqId.getLocalComponent()).append(SEQ_ID_SUFFIX); - } else { - List<PubSubProtocol.RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList(); - boolean first = true; - - sb.append(REMOTE_PREFIX); - for (PubSubProtocol.RegionSpecificSeqId rssid : remoteIds) { - if (!first) sb.append(REMOTE_RECORD_SEPARATOR); - first = false; - sb.append(rssid.getRegion().toStringUtf8()); - sb.append(REMOTE_RECORD_SEQ_ID_PREFIX); - sb.append(rssid.getSeqId()); - sb.append(REMOTE_RECORD_SEQ_ID_SUFFIX); - } - sb.append(SEQ_ID_SUFFIX); - } - - return sb.toString(); - } - - /** - * Based on - * {@link org.apache.hedwig.admin.console.ReadTopic#formatMessage(PubSubProtocol.Message)} - * - * This is tightly coupled with - * @see #generateJMSMessageIdFromSeqId(org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId) - * @param messageId The message id to convert to string. - * @return The seq-id - * @throws javax.jms.JMSException In case of exceptions doing the conversion. - */ - public static PubSubProtocol.MessageSeqId generateSeqIdFromJMSMessageId(final String messageId) - throws JMSException { - if (null == messageId || !messageId.startsWith(JMS_MESSAGE_ID_PREFIX)) { - throw new JMSException("Invalid messageId specified '" + messageId + "'"); - } - - PubSubProtocol.MessageSeqId.Builder builder = PubSubProtocol.MessageSeqId.newBuilder(); - // local ? - if (messageId.regionMatches(JMS_MESSAGE_ID_PREFIX.length(), LOCAL_PREFIX, 0, LOCAL_PREFIX.length())){ - try { - long seqId = Long.parseLong(messageId.substring(JMS_MESSAGE_ID_PREFIX.length() + - LOCAL_PREFIX.length(), messageId.length() - 1)); - builder.setLocalComponent(seqId); - } catch (NumberFormatException nfEx){ - JMSException jEx = new JMSException("Unable to parse local seq id from '" + - messageId + "' .. " + nfEx); - jEx.setLinkedException(nfEx); - throw jEx; - } - } - else { - assert messageId.regionMatches(JMS_MESSAGE_ID_PREFIX.length(), REMOTE_PREFIX, 0, - REMOTE_PREFIX.length()); - - final String[] remoteParts; - { - final String remoteMessageId = messageId.substring(JMS_MESSAGE_ID_PREFIX.length() + - REMOTE_PREFIX.length(), messageId.length() - 1); - // Should ew stop using pattern and move to using indexOf's ? - remoteParts = remoteMessageIdSplitPattern.split(remoteMessageId); - } - - for (String remote : remoteParts){ - if (REMOTE_RECORD_SEQ_ID_SUFFIX != remote.charAt(remote.length() - 1)) - throw new JMSException("Invalid remote region snippet (no seq suffix) '" + - remote + "' within '" + messageId); - final int regionIndx = remote.indexOf(REMOTE_RECORD_SEQ_ID_PREFIX); - if (-1 == regionIndx) - throw new JMSException("Invalid remote region snippet (no region) '" + remote + - "' within '" + messageId); - final String region = remote.substring(0, regionIndx); - final long seqId; - - - try { - seqId = Long.parseLong(remote.substring(regionIndx + 1, remote.length() - 1)); - } catch (NumberFormatException nfEx){ - JMSException jEx = new JMSException("Unable to parse remote seq id from '" + - remote + "' within '" + messageId + "' .. " + nfEx); - jEx.setLinkedException(nfEx); - throw jEx; - } - - PubSubProtocol.RegionSpecificSeqId.Builder rbuilder = - PubSubProtocol.RegionSpecificSeqId.newBuilder(); - rbuilder.setRegion(ByteString.copyFromUtf8(region)); - rbuilder.setSeqId(seqId); - builder.addRemoteComponents(rbuilder); - } - } - - return builder.build(); - } - - public static MessageImpl createCloneForDispatch(SessionImpl session, MessageImpl msg, - String sourceTopicName, String subscriberId) throws JMSException { - MessageImpl retval = msg.createClone(session, sourceTopicName, subscriberId); - retval.reset(); - return retval; - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java deleted file mode 100644 index ba26c4c..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/ObjectMessageImpl.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.message; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.protocol.PubSubProtocol; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageNotWriteableException; -import javax.jms.ObjectMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.util.Map; - -/** - * read/write serializable java object ... - * - */ -public class ObjectMessageImpl extends MessageImpl implements ObjectMessage { - private Serializable payload; - private boolean readMode; - - public ObjectMessageImpl(SessionImpl session, Serializable payload) { - super(session); - this.payload = payload; - this.readMode = false; - } - - public ObjectMessageImpl(SessionImpl session, ObjectMessageImpl message, String sourceTopicName, - String subscriberId) throws JMSException { - super(session, (MessageImpl) message, sourceTopicName, subscriberId); - - this.payload = copySerializable(message.getObject()); - this.readMode = message.readMode; - } - - private Serializable copySerializable(Serializable object) throws JMSException { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(128); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(object); - oos.flush(); - oos.close(); - baos.flush(); - baos.close(); - - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())); - return (Serializable) ois.readObject(); - } catch (IOException e){ - JMSException jmsEx = new javax.jms.IllegalStateException("Unexpected exception"); - jmsEx.setLinkedException(e); - throw jmsEx; - } catch (ClassNotFoundException e) { - JMSException jmsEx = new javax.jms.IllegalStateException("Unexpected exception"); - jmsEx.setLinkedException(e); - throw jmsEx; - } - } - - // To clone a message from a ObjectMessage which is NOT ObjectMessageImpl - // Changing order of parameter to NOT accidentally clash with the constructor above. - // This is midly confusing, but helps a lot in preventing accidental bugs ! - public ObjectMessageImpl(ObjectMessage message, SessionImpl session) throws JMSException { - super((Message) message, session); - - if (message instanceof ObjectMessageImpl) { - throw new JMSException("Coding bug - should use this constructor ONLY for non ObjectMessageImpl messages"); - } - - - this.payload = message.getObject(); - this.readMode = false; - } - - public ObjectMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties, - String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException { - super(session, message, properties, sourceTopicName, subscriberId, ackRunnable); - - try { - this.payload = hasBodyFromProperties() ? - (Serializable) MessageUtil.bytesToObject(message.getBody().toByteArray()) : null; - } catch (IOException e) { - JMSException ex = new JMSException("Unable to read message data .. " + e); - ex.setLinkedException(e); - throw ex; - } - this.readMode = true; - } - - @Override - protected MessageUtil.SupportedMessageTypes getJmsMessageType() { - return MessageUtil.SupportedMessageTypes.OBJECT; - } - - @Override - public PubSubProtocol.Message generateHedwigMessage() throws JMSException { - PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder(); - super.populateBuilderWithHeaders(builder); - - // Now set body and type. - try { - if (! isBodyEmpty()) builder.setBody(ByteString.copyFrom(MessageUtil.objectToBytes(this.payload))); - } catch (IOException e) { - JMSException ex = new JMSException("Unable to read message data .. " + e); - ex.setLinkedException(e); - throw ex; - } - - return builder.build(); - } - - protected boolean isBodyEmpty(){ - return null == this.payload; - } - - @Override - public void setObject(Serializable payload) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - this.payload = payload; - } - - @Override - public Serializable getObject() throws JMSException { - return payload; - } - - @Override - public void clearBody() throws JMSException { - super.clearBody(); - // allow read and write. - this.payload = null; - this.readMode = false; - } - - @Override - public void reset() throws JMSException { - if (this.readMode) return ; - this.readMode = true; - } - - @Override - ObjectMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) - throws JMSException { - - return new ObjectMessageImpl(session, this, sourceTopicName, subscriberId); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java deleted file mode 100644 index 2aa74a2..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/StreamMessageImpl.java +++ /dev/null @@ -1,752 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.message; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.jms.Mutable; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.protocol.PubSubProtocol; - -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageEOFException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import javax.jms.StreamMessage; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Deque; -import java.util.Map; - -/** - * Though similar to BytesMessageImpl, the difference is that BytesMessage expects the user to know - * the schema while - * StreamMessage user expects type conversion, etc. - * - * In our case, the stream is not a true open stream to the server; it is buffered in memory. - */ -public class StreamMessageImpl extends MessageImpl implements StreamMessage { - private ReadOnlyMessage readOnlyMessage; - private WriteOnlyMessage writeOnlyMessage; - private boolean readMode; - - public StreamMessageImpl(SessionImpl session) throws JMSException { - super(session); - clearBody(); - } - - // To clone a message - public StreamMessageImpl(SessionImpl session, StreamMessageImpl message, String sourceTopicName, - String subscriberId) throws JMSException { - super(session, (MessageImpl) message, sourceTopicName, subscriberId); - try { - if (message.readMode){ - this.readOnlyMessage = new ReadOnlyMessage(message.getPayloadData()); - this.writeOnlyMessage = null; - } - else { - this.readOnlyMessage = null; - this.writeOnlyMessage = new WriteOnlyMessage(message.getPayloadData()); - } - } catch (IOException e) { - JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e); - ex.setLinkedException(e); - throw ex; - } - - this.readMode = message.readMode; - } - - // To clone a message from a StreamMessage which is NOT StreamMessageImpl - // Changing order of parameter to NOT accidentally clash with the constructor above. - // This is midly confusing, but helps a lot in preventing accidental bugs ! - public StreamMessageImpl(StreamMessage message, SessionImpl session) throws JMSException { - super((Message) message, session); - - if (message instanceof StreamMessageImpl) { - throw new JMSException("Coding bug - should use this constructor ONLY for non StreamMessageImpl messages"); - } - - final byte[] data; - try { - WriteOnlyMessage wom = new WriteOnlyMessage(); - try { - Object obj; - while (null != (obj = message.readObject())){ - wom.writeObject(obj); - } - } catch (EOFException eof){ - // ignore ... - } - data = wom.getPayloadAsBytes(null); - } catch (IOException e) { - JMSException jEx = new JMSException("Unable to write to internal message .. " + e); - jEx.setLinkedException(e); - throw jEx; - } - - this.writeOnlyMessage = new WriteOnlyMessage(data); - - this.readOnlyMessage = null; - this.readMode = false; - } - - StreamMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties, - String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException { - super(session, message, properties, sourceTopicName, subscriberId, ackRunnable); - - final byte[] data = message.getBody().toByteArray(); - try { - this.readOnlyMessage = new ReadOnlyMessage(data); - } catch (IOException e) { - JMSException ex = new JMSException("Unable to clone/copy input message " + message + " .. " + e); - ex.setLinkedException(e); - throw ex; - } - - this.writeOnlyMessage = null; - this.readMode = true; - } - - @Override - protected MessageUtil.SupportedMessageTypes getJmsMessageType() { - return MessageUtil.SupportedMessageTypes.STREAM; - } - - protected boolean isBodyEmpty(){ - return false; - } - - @Override - public PubSubProtocol.Message generateHedwigMessage() throws JMSException { - PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder(); - super.populateBuilderWithHeaders(builder); - - // Now set body and type. - try { - byte[] data = getPayloadData(); - builder.setBody(ByteString.copyFrom(data)); - } catch (IOException e) { - JMSException ex = new JMSException("Unable to read message data .. " + e); - ex.setLinkedException(e); - throw ex; - } - - return builder.build(); - } - - - @Override - public boolean readBoolean() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readBoolean(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public byte readByte() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readByte(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public short readShort() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readShort(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public char readChar() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readChar(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public int readInt() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readInt(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public long readLong() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readLong(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public float readFloat() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readFloat(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public double readDouble() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readDouble(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public String readString() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readString(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public int readBytes(byte[] data) throws JMSException { - throw new UnsupportedOperationException("Please use readObject - this method is not supported"); - } - - @Override - public Object readObject() throws JMSException { - if (!readMode) throw new MessageNotReadableException("Message not readable"); - try { - return readOnlyMessage.readObject(); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("ioEx ?"); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeBoolean(boolean val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeBoolean(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeByte(byte val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeByte(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeShort(short val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeShort(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeChar(char val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeChar(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeInt(int val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeInt(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeLong(long val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeLong(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeFloat(float val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeFloat(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeDouble(double val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeDouble(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeString(String val) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeString(val); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeBytes(byte[] data) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeBytes(data); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void writeBytes(byte[] data, int offset, int length) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeBytes(data, offset, length); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - // This method is ONLY supposed to be used for object form of primitive types ! - @Override - public void writeObject(Object obj) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - try { - writeOnlyMessage.writeObject(obj); - } catch (IOException ioEx){ - JMSException eofEx = new JMSException("Unexpected ioex : " + ioEx); - eofEx.setLinkedException(ioEx); - throw eofEx; - } - } - - @Override - public void reset() throws JMSException { - if (this.readMode) return ; - this.readMode = true; - try { - byte[] data = writeOnlyMessage.getPayloadAsBytes(null); - this.readOnlyMessage = new ReadOnlyMessage(data); - } catch (IOException e) { - JMSException ex = new JMSException("cant convert to read only message ... unexpected actually .. " + e); - ex.setLinkedException(e); - throw ex; - } - this.writeOnlyMessage = null; - } - - @Override - public void clearBody() throws JMSException { - super.clearBody(); - this.writeOnlyMessage = new WriteOnlyMessage(); - this.readOnlyMessage = null; - this.readMode = false; - } - - private byte[] getPayloadData() throws IOException, IllegalStateException { - if (readMode) return readOnlyMessage.getDataCopy(); - - Mutable<byte[]> preCloseData = new Mutable<byte[]>(null); - byte[] data = writeOnlyMessage.getPayloadAsBytes(preCloseData); - - writeOnlyMessage = new WriteOnlyMessage(preCloseData.getValue()); - return data; - } - - - @Override - StreamMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) - throws JMSException { - return new StreamMessageImpl(session, this, sourceTopicName, subscriberId); - } - - // Using java object's instead of primitives to avoid having to store schema separately. - private static class ReadOnlyMessage { - - private final ObjectInputStream ois; - private final byte[] data; - private final Deque<Object> unreadObjects = new ArrayDeque<Object>(4); - - public ReadOnlyMessage(byte[] data) throws IOException { - this.data = data; - this.ois = new ObjectInputStream(new ByteArrayInputStream(data)); - } - - public byte[] getDataCopy(){ - return Arrays.copyOf(data, data.length); - } - - private void unreadObject(Object obj) { - unreadObjects.push(obj); - } - - private Object readNextObject() throws IOException, JMSException { - try { - if (! unreadObjects.isEmpty()) return unreadObjects.pop(); - - return ois.readObject(); - } catch (ClassNotFoundException e) { - // unexpected ! - javax.jms.IllegalStateException jEx = - new javax.jms.IllegalStateException("Unexpected not to be able to resolve class"); - jEx.setLinkedException(e); - throw jEx; - } catch (EOFException eof) { - throw new MessageEOFException("eof"); - } - } - - public boolean readBoolean() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - Boolean value = MessageUtil.asBoolean(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public byte readByte() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - Byte value = MessageUtil.asByte(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public short readShort() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - Short value = MessageUtil.asShort(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public char readChar() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - Character value = MessageUtil.asChar(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public int readInt() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - Integer value = MessageUtil.asInteger(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public long readLong() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - Long value = MessageUtil.asLong(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public float readFloat() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - Float value = MessageUtil.asFloat(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public double readDouble() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - Double value = MessageUtil.asDouble(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public String readString() throws IOException, JMSException { - Object obj = readNextObject(); - boolean failed = true; - try { - String value = MessageUtil.asString(obj); - failed = false; - return value; - } finally { - if (failed) unreadObject(obj); - } - } - - public Object readObject() throws IOException, JMSException { - return readNextObject(); - } - } - - private static class WriteOnlyMessage { - - private final ByteArrayOutputStream baos; - // private ObjectOutputStream oos; - private final ObjectOutputStream oos; - - public WriteOnlyMessage() throws JMSException { - baos = new ByteArrayOutputStream(); - try { - oos = new ObjectOutputStream(baos); - } catch (IOException e) { - IllegalStateException jEx = - new IllegalStateException("Unexpected to not be able to create empty write only message"); - jEx.setLinkedException(e); - throw jEx; - } - } - - private WriteOnlyMessage(final byte[] data) throws IllegalStateException { - baos = new ByteArrayOutputStream(); - try { - if (null != data) baos.write(data); - baos.flush(); - oos = new ObjectOutputStream(baos){ - // Do not write the header if data is based on already materialized stream. - protected void writeStreamHeader() throws IOException { - if (null == data || 0 == data.length) super.writeStreamHeader(); - } - }; - } catch (IOException e) { - IllegalStateException jEx = - new IllegalStateException("Unexpected to not be able to create empty write only message"); - jEx.setLinkedException(e); - throw jEx; - } - } - - public byte[] getPayloadAsBytes(Mutable<byte[]> preCloseData) throws IOException { - oos.flush(); - baos.flush(); - if (null != preCloseData) preCloseData.setValue(baos.toByteArray()); - oos.close(); - baos.flush(); - baos.close(); - // oos = null; - return baos.toByteArray(); - } - - public void writeBoolean(boolean val) throws IOException { - oos.writeObject(val); - } - - public void writeByte(byte val) throws IOException { - oos.writeObject(val); - } - - public void writeShort(short val) throws IOException { - oos.writeObject(val); - } - - public void writeChar(char val) throws IOException { - oos.writeObject(val); - } - - public void writeInt(int val) throws IOException { - oos.writeObject(val); - } - - public void writeLong(long val) throws IOException { - oos.writeObject(val); - } - - public void writeFloat(float val) throws IOException { - oos.writeObject(val); - } - - public void writeDouble(double val) throws IOException { - oos.writeObject(val); - } - - public void writeString(String val) throws IOException { - oos.writeObject(val); - } - - public void writeBytes(byte[] data) throws IOException { - oos.writeObject(data); - } - - // copy and write as a single byte array. - public void writeBytes(byte[] data, int offset, int length) throws IOException { - byte[] arr = new byte[length]; - System.arraycopy(data, offset, arr, 0, length); - writeBytes(arr); - } - - public void writeObject(Object obj) throws JMSException, IOException { - // unrolling it - if (obj instanceof Boolean) { - writeBoolean((Boolean) obj); - } - else if (obj instanceof Byte) { - writeByte((Byte) obj); - } - else if (obj instanceof Short) { - writeShort((Short) obj); - } - else if (obj instanceof Character) { - writeChar((Character) obj); - } - else if (obj instanceof Integer) { - writeInt((Integer) obj); - } - else if (obj instanceof Long) { - writeLong((Long) obj); - } - else if (obj instanceof Float) { - writeFloat((Float) obj); - } - else if (obj instanceof Double) { - writeDouble((Double) obj); - } - else if (obj instanceof String) { - writeString((String) obj); - } - else if (obj instanceof byte[]) { - writeBytes((byte[]) obj); - } - else{ - throw new JMSException("Unsupported type for obj : " + obj.getClass()); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java deleted file mode 100644 index dc3a3ca..0000000 --- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/message/TextMessageImpl.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.jms.message; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.protocol.PubSubProtocol; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageNotWriteableException; -import javax.jms.TextMessage; -import java.util.Map; - -/** - * read/write text message ... - */ -public class TextMessageImpl extends MessageImpl implements TextMessage { - private String payload; - private boolean readMode; - - public TextMessageImpl(SessionImpl session) { - super(session); - this.readMode = false; - } - - public TextMessageImpl(SessionImpl session, String payload) { - super(session); - this.payload = payload; - this.readMode = false; - } - - public TextMessageImpl(SessionImpl session, TextMessageImpl message, String sourceTopicName, - String subscriberId) throws JMSException { - super(session, (MessageImpl) message, sourceTopicName, subscriberId); - - this.payload = message.getText(); - this.readMode = message.readMode; - } - - - // To clone a message from a TextMessage which is NOT TextMessageImpl - // Changing order of parameter to NOT accidentally clash with the constructor above. - // This is midly confusing, but helps a lot in preventing accidental bugs ! - public TextMessageImpl(TextMessage message, SessionImpl session) throws JMSException { - super((Message) message, session); - - if (message instanceof TextMessageImpl) { - throw new JMSException("Coding bug - should use this constructor ONLY for non TextMessageImpl messages"); - } - - this.payload = message.getText(); - this.readMode = false; - } - - public TextMessageImpl(SessionImpl session, PubSubProtocol.Message message, Map<String, Object> properties, - String sourceTopicName, String subscriberId, Runnable ackRunnable) throws JMSException { - super(session, message, properties, sourceTopicName, subscriberId, ackRunnable); - - this.payload = hasBodyFromProperties() ? message.getBody().toStringUtf8() : null; - this.readMode = true; - } - - @Override - protected MessageUtil.SupportedMessageTypes getJmsMessageType() { - return MessageUtil.SupportedMessageTypes.TEXT; - } - - @Override - public PubSubProtocol.Message generateHedwigMessage() throws JMSException { - PubSubProtocol.Message.Builder builder = PubSubProtocol.Message.newBuilder(); - super.populateBuilderWithHeaders(builder); - if (! isBodyEmpty()) builder.setBody(ByteString.copyFromUtf8(this.payload)); - return builder.build(); - } - - protected boolean isBodyEmpty(){ - return null == this.payload; - } - - @Override - public void setText(String payload) throws JMSException { - if (readMode) throw new MessageNotWriteableException("Message not writable"); - this.payload = payload; - } - - @Override - public String getText() throws JMSException { - return payload; - } - - @Override - public void clearBody() throws JMSException { - super.clearBody(); - this.payload = null; - this.readMode = false; - } - - @Override - public void reset() throws JMSException { - if (this.readMode) return ; - this.readMode = true; - } - - @Override - TextMessageImpl createClone(SessionImpl session, String sourceTopicName, String subscriberId) throws JMSException { - return new TextMessageImpl(session, this, sourceTopicName, subscriberId); - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("TextMessageImpl"); - sb.append("{payload='").append(payload).append('\''); - sb.append(", readMode=").append(readMode); - sb.append(", parent=").append(super.toString()); - sb.append('}'); - return sb.toString(); - } -}
