Author: veithen
Date: Thu Jul 24 14:46:31 2008
New Revision: 679558
URL: http://svn.apache.org/viewvc?rev=679558&view=rev
Log:
Added necessary infrastructure to implement SYNAPSE-304 for JMS transport (no
changes to transport yet).
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java?rev=679558&r1=679557&r2=679558&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/PlainTextBuilder.java
Thu Jul 24 14:46:31 2008
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.Reader;
import javax.xml.namespace.QName;
@@ -27,7 +28,6 @@
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.builder.Builder;
import org.apache.axis2.builder.BuilderUtil;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.Parameter;
@@ -42,19 +42,19 @@
* the text in a wrapper element. The name of the wrapper element can
* be configured as a service parameter (see [EMAIL PROTECTED]
BaseConstants#WRAPPER_PARAM}).
* It defaults to [EMAIL PROTECTED] BaseConstants#DEFAULT_TEXT_WRAPPER}.
- * If the provided content type specifies a <tt>charset</tt> parameter
- * (e.g. <tt>text/plain; charset=ISO-8859-15</tt>) it is used to decode the
text.
- * Otherwise the default charset encoding specified by
+ * If the content is provided as an [EMAIL PROTECTED] InputStream} and the
content type specifies a
+ * <tt>charset</tt> parameter (e.g. <tt>text/plain; charset=ISO-8859-15</tt>),
+ * this information is used to decode the text.
+ * If the content is provided as an [EMAIL PROTECTED] InputStream} but no
<tt>charset</tt> parameter
+ * is specified on the content type, the default charset encoding specified by
* [EMAIL PROTECTED] MessageContext#DEFAULT_CHAR_SET_ENCODING} is used.
*/
-public class PlainTextBuilder implements Builder {
- public OMElement processDocument(InputStream inputStream,
- String contentType,
- MessageContext msgContext) throws
AxisFault {
-
+public class PlainTextBuilder implements TextMessageBuilder {
+ private OMElement buildMessage(String textPayload, MessageContext
msgContext) {
QName wrapperQName = BaseConstants.DEFAULT_TEXT_WRAPPER;
if (msgContext.getAxisService() != null) {
- Parameter wrapperParam =
msgContext.getAxisService().getParameter(BaseConstants.WRAPPER_PARAM);
+ Parameter wrapperParam
+ =
msgContext.getAxisService().getParameter(BaseConstants.WRAPPER_PARAM);
if (wrapperParam != null) {
wrapperQName =
BaseUtils.getQNameFromString(wrapperParam.getValue());
}
@@ -62,14 +62,35 @@
OMFactory factory = OMAbstractFactory.getOMFactory();
OMElement wrapper = factory.createOMElement(wrapperQName, null);
+ wrapper.addChild(factory.createOMText(textPayload));
+ return wrapper;
+ }
+
+ public OMElement processDocument(InputStream inputStream,
+ String contentType,
+ MessageContext msgContext) throws
AxisFault {
+
String charSetEnc = BuilderUtil.getCharSetEncoding(contentType);
- String textPayload;
try {
- textPayload = IOUtils.toString(inputStream, charSetEnc);
+ return buildMessage(IOUtils.toString(inputStream, charSetEnc),
msgContext);
} catch (IOException ex) {
throw new AxisFault("Unable to read message payload", ex);
}
- wrapper.addChild(factory.createOMText(textPayload));
- return wrapper;
+ }
+
+ public OMElement processDocument(Reader reader,
+ String contentType,
+ MessageContext msgContext) throws
AxisFault {
+ try {
+ return buildMessage(IOUtils.toString(reader), msgContext);
+ } catch (IOException ex) {
+ throw new AxisFault("Unable to read message payload", ex);
+ }
+ }
+
+ public OMElement processDocument(String content,
+ String contentType,
+ MessageContext msgContext) throws
AxisFault {
+ return buildMessage(content, msgContext);
}
}
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java?rev=679558&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java
(added)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/ReaderInputStream.java
Thu Jul 24 14:46:31 2008
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.synapse.format;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+
+/**
+ * [EMAIL PROTECTED] InputStream} implementation that reads a character stream
from a [EMAIL PROTECTED] Reader}
+ * and transforms it to a byte stream using a specified charset encoding. The
stream
+ * is transformed using a [EMAIL PROTECTED] CharsetEncoder} object,
guaranteeing that all charset
+ * encodings supported by the JRE are handled correctly. In particular for
charsets such as
+ * UTF-16, the implementation ensures that one and only one byte order marker
+ * is produced.
+ * <p>
+ * Since in general it is not possible to predict the number of characters to
be read from the
+ * [EMAIL PROTECTED] Reader} to satisfy a read request on the [EMAIL
PROTECTED] ReaderInputStream}, all reads from
+ * the [EMAIL PROTECTED] Reader} are buffered. There is therefore no well
defined correlation
+ * between the current position of the [EMAIL PROTECTED] Reader} and that of
the [EMAIL PROTECTED] ReaderInputStream}.
+ * This also implies that in general there is no need to wrap the underlying
[EMAIL PROTECTED] Reader}
+ * in a [EMAIL PROTECTED] java.io.BufferedReader}.
+ * <p>
+ * [EMAIL PROTECTED] ReaderInputStream} implements the inverse transformation
of [EMAIL PROTECTED] java.io.InputStreamReader};
+ * in the following example, reading from <tt>in2</tt> would return the same
byte
+ * sequence as reading from <tt>in</tt> (provided that the initial byte
sequence is legal
+ * with respect to the charset encoding):
+ * <pre>
+ * InputStream in = ...
+ * Charset cs = ...
+ * InputStreamReader reader = new InputStreamReader(in, cs);
+ * ReaderInputStream in2 = new ReaderInputStream(reader, cs);</pre>
+ * [EMAIL PROTECTED] ReaderInputStream} implements the same transformation as
[EMAIL PROTECTED] java.io.OutputStreamWriter},
+ * except that the control flow is reversed: both classes transform a
character stream
+ * into a byte stream, but [EMAIL PROTECTED] java.io.OutputStreamWriter}
pushes data to the underlying stream,
+ * while [EMAIL PROTECTED] ReaderInputStream} pulls it from the underlying
stream.
+ * <p>
+ * Note that while there are use cases where there is no alternative to using
+ * this class, very often the need to use this class is an indication of a flaw
+ * in the design of the code. This class is typically used in situations where
an existing
+ * API only accepts an [EMAIL PROTECTED] InputStream}, but where the most
natural way to produce the data
+ * is as a character stream, i.e. by providing a [EMAIL PROTECTED] Reader}
instance. An example of a situation
+ * where this problem may appear is when implementing the [EMAIL PROTECTED]
javax.activation.DataSource}
+ * interface from the Java Activation Framework.
+ * <p>
+ * Given the fact that the [EMAIL PROTECTED] Reader} class doesn't provide any
way to predict whether the next
+ * read operation will block or not, it is not possible to provide a meaningful
+ * implementation of the [EMAIL PROTECTED] InputStream#available()} method. A
call to this method
+ * will always return 0. Also, this class doesn't support [EMAIL PROTECTED]
InputStream#mark(int)}.
+ * <p>
+ * Instances of [EMAIL PROTECTED] ReaderInputStream} are not thread safe.
+ */
+public class ReaderInputStream extends InputStream {
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+
+ private final Reader reader;
+ private final CharsetEncoder encoder;
+
+ /**
+ * CharBuffer used as input for the decoder. It should be reasonably
+ * large as we read data from the underlying Reader into this buffer.
+ */
+ private final CharBuffer encoderIn;
+
+ /**
+ * ByteBuffer used as output for the decoder. This buffer can be small
+ * as it is only used to transfer data from the decoder to the
+ * buffer provided by the caller.
+ */
+ private final ByteBuffer encoderOut = ByteBuffer.allocate(128);
+
+ private CoderResult lastCoderResult;
+ private boolean endOfInput;
+
+ /**
+ * Construct a new [EMAIL PROTECTED] ReaderInputStream}.
+ *
+ * @param reader the target [EMAIL PROTECTED] Reader}
+ * @param charset the charset encoding
+ * @param bufferSize the size of the input buffer in number of characters
+ */
+ public ReaderInputStream(Reader reader, Charset charset, int bufferSize) {
+ this.reader = reader;
+ encoder = charset.newEncoder();
+ encoderIn = CharBuffer.allocate(bufferSize);
+ encoderIn.flip();
+ }
+
+ /**
+ * Construct a new [EMAIL PROTECTED] ReaderInputStream} with a default
input buffer size of
+ * 1024 characters.
+ *
+ * @param reader the target [EMAIL PROTECTED] Reader}
+ * @param charset the charset encoding
+ */
+ public ReaderInputStream(Reader reader, Charset charset) {
+ this(reader, charset, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Construct a new [EMAIL PROTECTED] ReaderInputStream}.
+ *
+ * @param reader the target [EMAIL PROTECTED] Reader}
+ * @param charsetName the name of the charset encoding
+ * @param bufferSize the size of the input buffer in number of characters
+ */
+ public ReaderInputStream(Reader reader, String charsetName, int
bufferSize) {
+ this(reader, Charset.forName(charsetName), bufferSize);
+ }
+
+ /**
+ * Construct a new [EMAIL PROTECTED] ReaderInputStream} with a default
input buffer size of
+ * 1024 characters.
+ *
+ * @param reader the target [EMAIL PROTECTED] Reader}
+ * @param charsetName the name of the charset encoding
+ */
+ public ReaderInputStream(Reader reader, String charsetName) {
+ this(reader, charsetName, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Construct a new [EMAIL PROTECTED] ReaderInputStream} that uses the
default character encoding
+ * with a default input buffer size of 1024 characters.
+ *
+ * @param reader the target [EMAIL PROTECTED] Reader}
+ */
+ public ReaderInputStream(Reader reader) {
+ this(reader, Charset.defaultCharset());
+ }
+
+ /**
+ * Read the specified number of bytes into an array.
+ *
+ * @param b the byte array to read into
+ * @param off the offset to start reading bytes into
+ * @param len the number of bytes to read
+ * @return the number of bytes read or <code>-1</code>
+ * if the end of the stream has been reached
+ */
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int read = 0;
+ while (len > 0) {
+ if (encoderOut.position() > 0) {
+ encoderOut.flip();
+ int c = Math.min(encoderOut.remaining(), len);
+ encoderOut.get(b, off, c);
+ off += c;
+ len -= c;
+ read += c;
+ encoderOut.compact();
+ } else {
+ if (!endOfInput && (lastCoderResult == null ||
lastCoderResult.isUnderflow())) {
+ encoderIn.compact();
+ int position = encoderIn.position();
+ // We don't use Reader#read(CharBuffer) here because it is
more efficient
+ // to write directly to the underlying char array (the
default implementation
+ // copies data to a temporary char array).
+ int c = reader.read(encoderIn.array(), position,
encoderIn.remaining());
+ if (c == -1) {
+ endOfInput = true;
+ } else {
+ encoderIn.position(position+c);
+ }
+ encoderIn.flip();
+ }
+ lastCoderResult = encoder.encode(encoderIn, encoderOut,
endOfInput);
+ if (endOfInput && encoderOut.position() == 0) {
+ break;
+ }
+ }
+ }
+ return read == 0 && endOfInput ? -1 : read;
+ }
+
+ /**
+ * Read the specified number of bytes into an array.
+ *
+ * @param b the byte array to read into
+ * @return the number of bytes read or <code>-1</code>
+ * if the end of the stream has been reached
+ */
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ /**
+ * Read a single byte.
+ *
+ * @return either the byte read or <code>-1</code> if the end of the stream
+ * has been reached
+ */
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ return read(b) == -1 ? -1 : b[0] & 0xFF;
+ }
+
+ /**
+ * Close the stream. This method will cause the underlying [EMAIL
PROTECTED] Reader}
+ * to be closed.
+ */
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+}
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java?rev=679558&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java
(added)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilder.java
Thu Jul 24 14:46:31 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.format;
+
+import java.io.Reader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.builder.Builder;
+import org.apache.axis2.context.MessageContext;
+
+/**
+ * Message builder able to build messages from a character stream.
+ * This interface can be optionally implemented by [EMAIL PROTECTED] Builder}
+ * implementations that support building a message from a character
+ * stream.
+ * <p>
+ * The character stream can either be provided as a string or a
+ * [EMAIL PROTECTED] Reader} object. The caller should use a [EMAIL PROTECTED]
Reader} object
+ * except if the content of the message is available as a string anyway.
+ * <p>
+ * This interface is currently used by the JMS transport to process
+ * [EMAIL PROTECTED] javax.jms.TextMessage} instances.
+ */
+public interface TextMessageBuilder extends Builder {
+ public OMElement processDocument(Reader reader, String contentType,
+ MessageContext messageContext) throws AxisFault;
+
+ public OMElement processDocument(String content, String contentType,
+ MessageContext messageContext) throws AxisFault;
+}
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java?rev=679558&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java
(added)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/format/TextMessageBuilderAdapter.java
Thu Jul 24 14:46:31 2008
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.format;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.builder.Builder;
+import org.apache.axis2.context.MessageContext;
+
+/**
+ * Adapter to add the [EMAIL PROTECTED] TextMessageBuilder} interface to an
+ * existing [EMAIL PROTECTED] Builder}.
+ * It implements the [EMAIL PROTECTED]
TextMessageBuilder#processDocument(Reader, String, MessageContext)}
+ * and [EMAIL PROTECTED] TextMessageBuilder#processDocument(String, String,
MessageContext)} by converting
+ * the character stream to a byte stream using [EMAIL PROTECTED]
ReaderInputStream}.
+ *
+ * TODO: specifying encoding
+ */
+public class TextMessageBuilderAdapter implements TextMessageBuilder {
+ private final Builder builder;
+
+ public TextMessageBuilderAdapter(Builder builder) {
+ this.builder = builder;
+ }
+
+ public OMElement processDocument(InputStream inputStream, String
contentType,
+ MessageContext messageContext) throws
AxisFault {
+ return builder.processDocument(inputStream, contentType,
messageContext);
+ }
+
+ public OMElement processDocument(Reader reader, String contentType,
+ MessageContext messageContext) throws
AxisFault {
+ // TODO: check that using the default charset encoding and leaving the
content type unaltered is a valid strategy
+ return processDocument(
+ new ReaderInputStream(reader,
MessageContext.DEFAULT_CHAR_SET_ENCODING),
+ contentType, messageContext);
+ }
+
+ public OMElement processDocument(String content, String contentType,
+ MessageContext messageContext) throws
AxisFault {
+ return processDocument(new StringReader(content), contentType,
messageContext);
+ }
+}
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java?rev=679558&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java
(added)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/BytesMessageInputStream.java
Thu Jul 24 14:46:31 2008
@@ -0,0 +1,74 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.synapse.transport.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+
+import org.apache.commons.io.IOExceptionWithCause;
+
+/**
+ * Input stream that reads data from a JMS [EMAIL PROTECTED] BytesMessage}.
+ */
+public class BytesMessageInputStream extends InputStream {
+ private final BytesMessage message;
+
+ public BytesMessageInputStream(BytesMessage message) {
+ this.message = message;
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return message.readByte() & 0xFF;
+ } catch (MessageEOFException ex) {
+ return -1;
+ } catch (JMSException ex) {
+ throw new IOExceptionWithCause(ex);
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (off == 0) {
+ try {
+ return message.readBytes(b, len);
+ } catch (JMSException ex) {
+ throw new IOExceptionWithCause(ex);
+ }
+ } else {
+ byte[] b2 = new byte[len];
+ int c = read(b2);
+ if (c > 0) {
+ System.arraycopy(b2, 0, b, off, c);
+ }
+ return c;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ return message.readBytes(b);
+ } catch (JMSException ex) {
+ throw new IOExceptionWithCause(ex);
+ }
+ }
+}