Author: veithen
Date: Thu Jul 24 14:46:31 2008
New Revision: 679558

URL: http://svn.apache.org/viewvc?rev=679558&view=rev
Log:
Added necessary infrastructure to implement SYNAPSE-304 for JMS transport (no 
changes to transport yet).

Added:
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java
Modified:
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java

Modified: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java?rev=679558&r1=679557&r2=679558&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java
 (original)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java
 Thu Jul 24 14:46:31 2008
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Reader;
 
 import javax.xml.namespace.QName;
 
@@ -27,7 +28,6 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMFactory;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.builder.Builder;
 import org.apache.axis2.builder.BuilderUtil;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.Parameter;
@@ -42,19 +42,19 @@
  * the text in a wrapper element. The name of the wrapper element can
  * be configured as a service parameter (see [EMAIL PROTECTED] 
BaseConstants#WRAPPER_PARAM}).
  * It defaults to [EMAIL PROTECTED] BaseConstants#DEFAULT_TEXT_WRAPPER}.
- * If the provided content type specifies a <tt>charset</tt> parameter
- * (e.g. <tt>text/plain; charset=ISO-8859-15</tt>) it is used to decode the 
text.
- * Otherwise the default charset encoding specified by
+ * If the content is provided as an [EMAIL PROTECTED] InputStream} and the 
content type specifies a
+ * <tt>charset</tt> parameter (e.g. <tt>text/plain; charset=ISO-8859-15</tt>),
+ * this information is used to decode the text.
+ * If the content is provided as an [EMAIL PROTECTED] InputStream} but no 
<tt>charset</tt> parameter
+ * is specified on the content type, the default charset encoding specified by
  * [EMAIL PROTECTED] MessageContext#DEFAULT_CHAR_SET_ENCODING} is used.
  */
-public class PlainTextBuilder implements Builder {
-    public OMElement processDocument(InputStream inputStream,
-                                     String contentType,
-                                     MessageContext msgContext) throws 
AxisFault {
-
+public class PlainTextBuilder implements TextMessageBuilder {
+    private OMElement buildMessage(String textPayload, MessageContext 
msgContext) {
         QName wrapperQName = BaseConstants.DEFAULT_TEXT_WRAPPER;
         if (msgContext.getAxisService() != null) {
-            Parameter wrapperParam = 
msgContext.getAxisService().getParameter(BaseConstants.WRAPPER_PARAM);
+            Parameter wrapperParam
+                    = 
msgContext.getAxisService().getParameter(BaseConstants.WRAPPER_PARAM);
             if (wrapperParam != null) {
                 wrapperQName = 
BaseUtils.getQNameFromString(wrapperParam.getValue());
             }
@@ -62,14 +62,35 @@
 
         OMFactory factory = OMAbstractFactory.getOMFactory();
         OMElement wrapper = factory.createOMElement(wrapperQName, null);
+        wrapper.addChild(factory.createOMText(textPayload));
+        return wrapper;
+    }
+    
+    public OMElement processDocument(InputStream inputStream,
+                                     String contentType,
+                                     MessageContext msgContext) throws 
AxisFault {
+
         String charSetEnc = BuilderUtil.getCharSetEncoding(contentType);
-        String textPayload;
         try {
-            textPayload = IOUtils.toString(inputStream, charSetEnc);
+            return buildMessage(IOUtils.toString(inputStream, charSetEnc), 
msgContext);
         } catch (IOException ex) {
             throw new AxisFault("Unable to read message payload", ex);
         }
-        wrapper.addChild(factory.createOMText(textPayload));
-        return wrapper;
+    }
+
+    public OMElement processDocument(Reader reader,
+                                     String contentType,
+                                     MessageContext msgContext) throws 
AxisFault {
+        try {
+            return buildMessage(IOUtils.toString(reader), msgContext);
+        } catch (IOException ex) {
+            throw new AxisFault("Unable to read message payload", ex);
+        }
+    }
+
+    public OMElement processDocument(String content,
+                                     String contentType,
+                                     MessageContext msgContext) throws 
AxisFault {
+        return buildMessage(content, msgContext);
     }
 }

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java?rev=679558&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java
 Thu Jul 24 14:46:31 2008
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.synapse.format;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+
+/**
+ * [EMAIL PROTECTED] InputStream} implementation that reads a character stream 
from a [EMAIL PROTECTED] Reader}
+ * and transforms it to a byte stream using a specified charset encoding. The 
stream
+ * is transformed using a [EMAIL PROTECTED] CharsetEncoder} object, 
guaranteeing that all charset
+ * encodings supported by the JRE are handled correctly. In particular for 
charsets such as
+ * UTF-16, the implementation ensures that one and only one byte order marker
+ * is produced.
+ * <p>
+ * Since in general it is not possible to predict the number of characters to 
be read from the
+ * [EMAIL PROTECTED] Reader} to satisfy a read request on the [EMAIL 
PROTECTED] ReaderInputStream}, all reads from
+ * the [EMAIL PROTECTED] Reader} are buffered. There is therefore no well 
defined correlation
+ * between the current position of the [EMAIL PROTECTED] Reader} and that of 
the [EMAIL PROTECTED] ReaderInputStream}.
+ * This also implies that in general there is no need to wrap the underlying 
[EMAIL PROTECTED] Reader}
+ * in a [EMAIL PROTECTED] java.io.BufferedReader}.
+ * <p>
+ * [EMAIL PROTECTED] ReaderInputStream} implements the inverse transformation 
of [EMAIL PROTECTED] java.io.InputStreamReader};
+ * in the following example, reading from <tt>in2</tt> would return the same 
byte
+ * sequence as reading from <tt>in</tt> (provided that the initial byte 
sequence is legal
+ * with respect to the charset encoding):
+ * <pre>
+ * InputStream in = ...
+ * Charset cs = ...
+ * InputStreamReader reader = new InputStreamReader(in, cs);
+ * ReaderInputStream in2 = new ReaderInputStream(reader, cs);</pre>
+ * [EMAIL PROTECTED] ReaderInputStream} implements the same transformation as 
[EMAIL PROTECTED] java.io.OutputStreamWriter},
+ * except that the control flow is reversed: both classes transform a 
character stream
+ * into a byte stream, but [EMAIL PROTECTED] java.io.OutputStreamWriter} 
pushes data to the underlying stream,
+ * while [EMAIL PROTECTED] ReaderInputStream} pulls it from the underlying 
stream.
+ * <p>
+ * Note that while there are use cases where there is no alternative to using
+ * this class, very often the need to use this class is an indication of a flaw
+ * in the design of the code. This class is typically used in situations where 
an existing
+ * API only accepts an [EMAIL PROTECTED] InputStream}, but where the most 
natural way to produce the data
+ * is as a character stream, i.e. by providing a [EMAIL PROTECTED] Reader} 
instance. An example of a situation
+ * where this problem may appear is when implementing the [EMAIL PROTECTED] 
javax.activation.DataSource}
+ * interface from the Java Activation Framework.
+ * <p>
+ * Given the fact that the [EMAIL PROTECTED] Reader} class doesn't provide any 
way to predict whether the next
+ * read operation will block or not, it is not possible to provide a meaningful
+ * implementation of the [EMAIL PROTECTED] InputStream#available()} method. A 
call to this method
+ * will always return 0. Also, this class doesn't support [EMAIL PROTECTED] 
InputStream#mark(int)}.
+ * <p>
+ * Instances of [EMAIL PROTECTED] ReaderInputStream} are not thread safe.
+ */
+public class ReaderInputStream extends InputStream {
+    private static final int DEFAULT_BUFFER_SIZE = 1024;
+    
+    private final Reader reader;
+    private final CharsetEncoder encoder;
+    
+    /**
+     * CharBuffer used as input for the decoder. It should be reasonably
+     * large as we read data from the underlying Reader into this buffer.
+     */
+    private final CharBuffer encoderIn;
+    
+    /**
+     * ByteBuffer used as output for the decoder. This buffer can be small
+     * as it is only used to transfer data from the decoder to the
+     * buffer provided by the caller.
+     */
+    private final ByteBuffer encoderOut = ByteBuffer.allocate(128);
+    
+    private CoderResult lastCoderResult;
+    private boolean endOfInput;
+    
+    /**
+     * Construct a new [EMAIL PROTECTED] ReaderInputStream}.
+     * 
+     * @param reader the target [EMAIL PROTECTED] Reader}
+     * @param charset the charset encoding
+     * @param bufferSize the size of the input buffer in number of characters
+     */
+    public ReaderInputStream(Reader reader, Charset charset, int bufferSize) {
+        this.reader = reader;
+        encoder = charset.newEncoder();
+        encoderIn = CharBuffer.allocate(bufferSize);
+        encoderIn.flip();
+    }
+    
+    /**
+     * Construct a new [EMAIL PROTECTED] ReaderInputStream} with a default 
input buffer size of
+     * 1024 characters.
+     * 
+     * @param reader the target [EMAIL PROTECTED] Reader}
+     * @param charset the charset encoding
+     */
+    public ReaderInputStream(Reader reader, Charset charset) {
+        this(reader, charset, DEFAULT_BUFFER_SIZE);
+    }
+    
+    /**
+     * Construct a new [EMAIL PROTECTED] ReaderInputStream}.
+     * 
+     * @param reader the target [EMAIL PROTECTED] Reader}
+     * @param charsetName the name of the charset encoding
+     * @param bufferSize the size of the input buffer in number of characters
+     */
+    public ReaderInputStream(Reader reader, String charsetName, int 
bufferSize) {
+        this(reader, Charset.forName(charsetName), bufferSize);
+    }
+    
+    /**
+     * Construct a new [EMAIL PROTECTED] ReaderInputStream} with a default 
input buffer size of
+     * 1024 characters.
+     * 
+     * @param reader the target [EMAIL PROTECTED] Reader}
+     * @param charsetName the name of the charset encoding
+     */
+    public ReaderInputStream(Reader reader, String charsetName) {
+        this(reader, charsetName, DEFAULT_BUFFER_SIZE);
+    }
+    
+    /**
+     * Construct a new [EMAIL PROTECTED] ReaderInputStream} that uses the 
default character encoding
+     * with a default input buffer size of 1024 characters.
+     * 
+     * @param reader the target [EMAIL PROTECTED] Reader}
+     */
+    public ReaderInputStream(Reader reader) {
+        this(reader, Charset.defaultCharset());
+    }
+    
+    /**
+     * Read the specified number of bytes into an array.
+     * 
+     * @param b the byte array to read into
+     * @param off the offset to start reading bytes into
+     * @param len the number of bytes to read
+     * @return the number of bytes read or <code>-1</code>
+     *         if the end of the stream has been reached
+     */
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        int read = 0;
+        while (len > 0) {
+            if (encoderOut.position() > 0) {
+                encoderOut.flip();
+                int c = Math.min(encoderOut.remaining(), len);
+                encoderOut.get(b, off, c);
+                off += c;
+                len -= c;
+                read += c;
+                encoderOut.compact();
+            } else {
+                if (!endOfInput && (lastCoderResult == null || 
lastCoderResult.isUnderflow())) {
+                    encoderIn.compact();
+                    int position = encoderIn.position();
+                    // We don't use Reader#read(CharBuffer) here because it is 
more efficient
+                    // to write directly to the underlying char array (the 
default implementation
+                    // copies data to a temporary char array).
+                    int c = reader.read(encoderIn.array(), position, 
encoderIn.remaining());
+                    if (c == -1) {
+                        endOfInput = true;
+                    } else {
+                        encoderIn.position(position+c);
+                    }
+                    encoderIn.flip();
+                }
+                lastCoderResult = encoder.encode(encoderIn, encoderOut, 
endOfInput);
+                if (endOfInput && encoderOut.position() == 0) {
+                    break;
+                }
+            }
+        }
+        return read == 0 && endOfInput ? -1 : read;
+    }
+
+    /**
+     * Read the specified number of bytes into an array.
+     * 
+     * @param b the byte array to read into
+     * @return the number of bytes read or <code>-1</code>
+     *         if the end of the stream has been reached
+     */
+    @Override
+    public int read(byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    /**
+     * Read a single byte.
+     *
+     * @return either the byte read or <code>-1</code> if the end of the stream
+     *         has been reached
+     */
+    @Override
+    public int read() throws IOException {
+        byte[] b = new byte[1];
+        return read(b) == -1 ? -1 : b[0] & 0xFF;
+    }
+    
+    /**
+     * Close the stream. This method will cause the underlying [EMAIL 
PROTECTED] Reader}
+     * to be closed.
+     */
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java?rev=679558&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java
 Thu Jul 24 14:46:31 2008
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.format;
+
+import java.io.Reader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.builder.Builder;
+import org.apache.axis2.context.MessageContext;
+
+/**
+ * Message builder able to build messages from a character stream.
+ * This interface can be optionally implemented by [EMAIL PROTECTED] Builder}
+ * implementations that support building a message from a character
+ * stream.
+ * <p>
+ * The character stream can either be provided as a string or a
+ * [EMAIL PROTECTED] Reader} object. The caller should use a [EMAIL PROTECTED] 
Reader} object
+ * except if the content of the message is available as a string anyway.
+ * <p>
+ * This interface is currently used by the JMS transport to process
+ * [EMAIL PROTECTED] javax.jms.TextMessage} instances.
+ */
+public interface TextMessageBuilder extends Builder {
+    public OMElement processDocument(Reader reader, String contentType,
+            MessageContext messageContext) throws AxisFault;
+    
+    public OMElement processDocument(String content, String contentType,
+            MessageContext messageContext) throws AxisFault;
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java?rev=679558&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java
 Thu Jul 24 14:46:31 2008
@@ -0,0 +1,64 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.format;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.builder.Builder;
+import org.apache.axis2.context.MessageContext;
+
+/**
+ * Adapter to add the [EMAIL PROTECTED] TextMessageBuilder} interface to an
+ * existing [EMAIL PROTECTED] Builder}.
+ * It implements the [EMAIL PROTECTED] 
TextMessageBuilder#processDocument(Reader, String, MessageContext)}
+ * and [EMAIL PROTECTED] TextMessageBuilder#processDocument(String, String, 
MessageContext)} by converting
+ * the character stream to a byte stream using [EMAIL PROTECTED] 
ReaderInputStream}.
+ * 
+ * TODO: specifying encoding
+ */
+public class TextMessageBuilderAdapter implements TextMessageBuilder {
+    private final Builder builder;
+
+    public TextMessageBuilderAdapter(Builder builder) {
+        this.builder = builder;
+    }
+
+    public OMElement processDocument(InputStream inputStream, String 
contentType,
+                                     MessageContext messageContext) throws 
AxisFault {
+        return builder.processDocument(inputStream, contentType, 
messageContext);
+    }
+
+    public OMElement processDocument(Reader reader, String contentType,
+                                     MessageContext messageContext) throws 
AxisFault {
+        // TODO: check that using the default charset encoding and leaving the 
content type unaltered is a valid strategy
+        return processDocument(
+                new ReaderInputStream(reader, 
MessageContext.DEFAULT_CHAR_SET_ENCODING),
+                contentType, messageContext);
+    }
+
+    public OMElement processDocument(String content, String contentType,
+                                     MessageContext messageContext) throws 
AxisFault {
+        return processDocument(new StringReader(content), contentType, 
messageContext);
+    }
+}

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java?rev=679558&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java
 Thu Jul 24 14:46:31 2008
@@ -0,0 +1,74 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.synapse.transport.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+
+import org.apache.commons.io.IOExceptionWithCause;
+
+/**
+ * Input stream that reads data from a JMS [EMAIL PROTECTED] BytesMessage}.
+ */
+public class BytesMessageInputStream extends InputStream {
+    private final BytesMessage message;
+
+    public BytesMessageInputStream(BytesMessage message) {
+        this.message = message;
+    }
+
+    @Override
+    public int read() throws IOException {
+        try {
+            return message.readByte() & 0xFF;
+        } catch (MessageEOFException ex) {
+            return -1;
+        } catch (JMSException ex) {
+            throw new IOExceptionWithCause(ex);
+        }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (off == 0) {
+            try {
+                return message.readBytes(b, len);
+            } catch (JMSException ex) {
+                throw new IOExceptionWithCause(ex);
+            }
+        } else {
+            byte[] b2 = new byte[len];
+            int c = read(b2);
+            if (c > 0) {
+                System.arraycopy(b2, 0, b, off, c); 
+            }
+            return c;
+        }
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        try {
+            return message.readBytes(b);
+        } catch (JMSException ex) {
+            throw new IOExceptionWithCause(ex);
+        }
+    }
+}


Reply via email to