Author: davsclaus
Date: Thu Jun 19 23:55:24 2008
New Revision: 669804
URL: http://svn.apache.org/viewvc?rev=669804&view=rev
Log:
CAMEL-421: Polished stream component. Added encoding option. Added unit tests.
Added:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamDelayTest.java
(with props)
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamEncodingTest.java
(with props)
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemErrTest.java
(contents, props changed)
- copied, changed from r669774,
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java
Modified:
activemq/camel/trunk/components/camel-stream/pom.xml
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java
Modified: activemq/camel/trunk/components/camel-stream/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/pom.xml?rev=669804&r1=669803&r2=669804&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-stream/pom.xml (original)
+++ activemq/camel/trunk/components/camel-stream/pom.xml Thu Jun 19 23:55:24
2008
@@ -61,6 +61,11 @@
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- to allow Spring annotations (jmx) to be tested -->
<dependency>
<groupId>org.springframework</groupId>
@@ -76,11 +81,6 @@
<optional>true</optional>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
Modified:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java?rev=669804&r1=669803&r2=669804&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
(original)
+++
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamComponent.java
Thu Jun 19 23:55:24 2008
@@ -26,6 +26,9 @@
*/
public class StreamComponent extends DefaultComponent<Exchange> {
+ // TODO: remove file and url support in this component. Will be removed in
Camel 2.0
+ // (Should use other components for such needs.)
+
@Override
protected Endpoint<Exchange> createEndpoint(String uri, String remaining,
Map parameters)
throws Exception {
Modified:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=669804&r1=669803&r2=669804&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
Thu Jun 19 23:55:24 2008
@@ -24,19 +24,20 @@
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.Message;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
- * Consumer that can read from any stream
+ * Consumer that can read from streams
*/
public class StreamConsumer extends DefaultConsumer<Exchange> {
@@ -72,22 +73,18 @@
@Override
public void doStop() throws Exception {
- if (inputStream != null) {
- inputStream.close();
- }
+ // important: do not close the stream as it will close the standard
system.in etc.
super.doStop();
}
private void readFromStream() throws Exception {
- BufferedReader br = new BufferedReader(new
InputStreamReader(inputStream));
+ Charset charset = endpoint.getCharset();
+ BufferedReader br = new BufferedReader(new
InputStreamReader(inputStream, charset));
String line;
- try {
- while ((line = br.readLine()) != null) {
- consumeLine(line);
- }
- } finally {
- br.close();
+ while ((line = br.readLine()) != null) {
+ consumeLine(line);
}
+ // important: do not close the reader as it will close the standard
system.in etc.
}
private void consumeLine(Object line) throws Exception {
Modified:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=669804&r1=669803&r2=669804&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
(original)
+++
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
Thu Jun 19 23:55:24 2008
@@ -16,18 +16,25 @@
*/
package org.apache.camel.component.stream;
+import java.nio.charset.Charset;
+
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class StreamEndpoint extends DefaultEndpoint<Exchange> {
+ private static final transient Log LOG =
LogFactory.getLog(StreamEndpoint.class);
+
private String uri;
private String file;
private String url;
private long delay;
+ private String encoding;
public StreamEndpoint(String endpointUri, Component component) throws
Exception {
super(endpointUri, component);
@@ -58,6 +65,9 @@
return file;
}
+ /**
+ * @deprecated use camel-file component. Will be removed in Camel 2.0
+ */
public void setFile(String file) {
this.file = file;
}
@@ -66,6 +76,9 @@
return url;
}
+ /**
+ * @deprecated use camel-jetty or camel-http component. Will be removed in
Camel 2.0
+ */
public void setUrl(String url) {
this.url = url;
}
@@ -77,4 +90,30 @@
public void setDelay(long delay) {
this.delay = delay;
}
+
+ public String getEncoding() {
+ return encoding;
+ }
+
+ public void setEncoding(String encoding) {
+ this.encoding = encoding;
+ }
+
+ // Implementations
+ //-------------------------------------------------------------------------
+
+ Charset getCharset() {
+ if (encoding == null) {
+ encoding = Charset.defaultCharset().name();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No encoding parameter using default charset: " +
encoding);
+ }
+ }
+ if (!Charset.isSupported(encoding)) {
+ throw new IllegalArgumentException("The encoding: " + encoding + "
is not supported");
+ }
+
+ return Charset.forName(encoding);
+ }
+
}
Modified:
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=669804&r1=669803&r2=669804&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
(original)
+++
activemq/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
Thu Jun 19 23:55:24 2008
@@ -22,8 +22,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.net.URL;
import java.net.URLConnection;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
@@ -33,6 +35,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+/**
+ * Producer that can write to streams
+ */
public class StreamProducer extends DefaultProducer<Exchange> {
private static final transient Log LOG =
LogFactory.getLog(StreamProducer.class);
@@ -52,9 +57,7 @@
@Override
public void doStop() throws Exception {
- if (outputStream != null) {
- outputStream.close();
- }
+ // important: do not close the stream as it will close the standard
system.out etc.
super.doStop();
}
@@ -96,7 +99,8 @@
if (o != null && o instanceof OutputStream) {
return (OutputStream)o;
} else {
- throw new CamelExchangeException("Expected OutputStream in
header('stream'), found: " + o, exchange);
+ throw new CamelExchangeException("Expected OutputStream in
header('stream'), found: " + o,
+ exchange);
}
}
@@ -110,24 +114,27 @@
Thread.sleep(ms);
}
- private void writeToStream(Exchange exchange) throws IOException {
+ private void writeToStream(Exchange exchange) throws IOException,
CamelExchangeException {
Object body = exchange.getIn().getBody();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing " + body + " to " + outputStream);
- }
if (body instanceof String) {
- LOG.debug("in text buffered mode");
- BufferedWriter bw = new BufferedWriter(new
OutputStreamWriter(outputStream));
- try {
- bw.write((String)body);
- bw.write("\n");
- bw.flush();
- } finally {
- bw.close();
+ Charset charset = endpoint.getCharset();
+ Writer writer = new OutputStreamWriter(outputStream, charset);
+ BufferedWriter bw = new BufferedWriter(writer);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing as text: " + body + " to " + outputStream +
" using encoding:" + charset);
+ }
+ bw.write((String)body);
+ bw.write("\n");
+ bw.flush();
+ // important: do not close the writer as it will close the
standard system.out etc.
+ } else if (body instanceof byte[]) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing as text: " + body + " to " + outputStream);
}
- } else {
- LOG.debug("in binary stream mode");
outputStream.write((byte[])body);
+ } else {
+ throw new CamelExchangeException("The body is neither a String or
byte array. "
+ + "Can not write body to output stream", exchange);
}
}
Added:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamDelayTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamDelayTest.java?rev=669804&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamDelayTest.java
(added)
+++
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamDelayTest.java
Thu Jun 19 23:55:24 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * Unit test for delay option.
+ */
+public class StreamDelayTest extends ContextTestSupport {
+
+ public void testStringContent() throws Exception {
+ long start = System.currentTimeMillis();
+ template.sendBody("direct:in", "Hello Text World\n");
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Delay should be around 2 sec: " + delta, delta > 1900 &&
delta < 3000);
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:in").to("stream:out?delay=2000");
+ }
+ };
+ }
+
+}
Propchange:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamDelayTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamDelayTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamEncodingTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamEncodingTest.java?rev=669804&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamEncodingTest.java
(added)
+++
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamEncodingTest.java
Thu Jun 19 23:55:24 2008
@@ -0,0 +1,26 @@
+package org.apache.camel.component.stream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * Unit test for encoding option
+ */
+public class StreamEncodingTest extends ContextTestSupport {
+
+ public void testStringContent() throws Exception {
+ // include a UTF-8 char in the text \u0E08 is a Thai elephant
+ String body = "Hello Thai Elephant \u0E08";
+
+ template.sendBody("direct:in", body);
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:in").to("stream:out?encoding=UTF-8");
+ }
+ };
+ }
+
+}
Propchange:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamEncodingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamEncodingTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java?rev=669804&r1=669803&r2=669804&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
(original)
+++
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamRouteBuilderTest.java
Thu Jun 19 23:55:24 2008
@@ -22,18 +22,18 @@
public class StreamRouteBuilderTest extends ContextTestSupport {
public void testStringContent() {
- template.sendBody("direct:start", "<content/>");
+ template.sendBody("direct:start", "this is text\n");
}
public void testBinaryContent() {
- template.sendBody("direct:start", new byte[] {1, 2, 3, 4});
+ template.sendBody("direct:start", "This is bytes\n".getBytes());
}
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
from("direct:start").setHeader("stream", constant(System.out))
- .to("stream:err", "stream:out",
"stream:file?file=/tmp/foo", "stream:header");
+ .to("stream:err", "stream:out", "stream:header");
}
};
}
Copied:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemErrTest.java
(from r669774,
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemErrTest.java?p2=activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemErrTest.java&p1=activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java&r1=669774&r2=669804&rev=669804&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java
(original)
+++
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemErrTest.java
Thu Jun 19 23:55:24 2008
@@ -1,58 +1,43 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.stream;
-
-import java.io.OutputStream;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Producer;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.builder.RouteBuilder;
-
-/**
- * Unit test for System.out
- */
-public class StreamSystemOutTest extends ContextTestSupport {
-
- public void testStringContent() throws Exception {
- Endpoint endpoint = context.getEndpoint("direct:in");
- Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
- exchange.getIn().setBody("Hello World\n");
- Producer producer = endpoint.createProducer();
- producer.start();
- producer.process(exchange);
- producer.stop();
-
- //template.sendBody("direct:in", "Hello");
- System.out.println("End of test");
- }
-
-/* public void testBinaryContent() {
- template.sendBody("direct:in", "World".getBytes());
- }*/
-
- protected RouteBuilder createRouteBuilder() {
- return new RouteBuilder() {
- public void configure() {
- from("direct:in").to("stream:out");
- }
- };
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * Unit test for System.err
+ */
+public class StreamSystemErrTest extends ContextTestSupport {
+
+ public void testStringContent() throws Exception {
+ template.sendBody("direct:in", "Hello Text World\n");
+ }
+
+ public void testBinaryContent() {
+ template.sendBody("direct:in", "Hello Bytes World\n".getBytes());
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:in").to("stream:err");
+ }
+ };
+ }
+
+}
Propchange:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemErrTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemErrTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java?rev=669804&r1=669803&r2=669804&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java
(original)
+++
activemq/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java
Thu Jun 19 23:55:24 2008
@@ -16,13 +16,7 @@
*/
package org.apache.camel.component.stream;
-import java.io.OutputStream;
-
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Producer;
-import org.apache.camel.ExchangePattern;
import org.apache.camel.builder.RouteBuilder;
/**
@@ -30,22 +24,14 @@
*/
public class StreamSystemOutTest extends ContextTestSupport {
+ // START SNIPPET: e1
public void testStringContent() throws Exception {
- Endpoint endpoint = context.getEndpoint("direct:in");
- Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
- exchange.getIn().setBody("Hello World\n");
- Producer producer = endpoint.createProducer();
- producer.start();
- producer.process(exchange);
- producer.stop();
-
- //template.sendBody("direct:in", "Hello");
- System.out.println("End of test");
+ template.sendBody("direct:in", "Hello Text World\n");
}
-/* public void testBinaryContent() {
- template.sendBody("direct:in", "World".getBytes());
- }*/
+ public void testBinaryContent() {
+ template.sendBody("direct:in", "Hello Bytes World\n".getBytes());
+ }
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@@ -54,5 +40,6 @@
}
};
}
-
+ // END SNIPPET: e1
+
}