CAMEL-6899: stream consumer with groupLines should send last message when EOL but the groupLines limit was not hit, otherwise the last group is never sent out.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ac70d071 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ac70d071 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ac70d071 Branch: refs/heads/camel-2.12.x Commit: ac70d071d7aa5ff4955e7f37e49042beffed7347 Parents: 8feba84 Author: Claus Ibsen <[email protected]> Authored: Tue Nov 12 10:37:07 2013 +0100 Committer: Claus Ibsen <[email protected]> Committed: Tue Nov 12 10:37:22 2013 +0100 ---------------------------------------------------------------------- .../camel/component/stream/StreamConsumer.java | 14 ++++-- .../camel/component/stream/StreamProducer.java | 1 + .../StreamGroupLinesLastStrategyTest.java | 51 ++++++++++++++++++++ .../stream/StreamGroupLinesStrategyTest.java | 3 +- .../component/stream/StreamGroupLinesTest.java | 1 + 5 files changed, 64 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ac70d071/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java index b05ab0b..1e8f9a9 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java @@ -159,6 +159,8 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { processLine(line); } } + // EOL so trigger any + processLine(null); } // important: do not close the reader as it will close the standard system.in etc. } @@ -167,13 +169,17 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { * Strategy method for processing the line */ protected synchronized void processLine(String line) throws Exception { + boolean last = line == null; + if (endpoint.getGroupLines() > 0) { // remember line - lines.add(line); + if (line != null) { + lines.add(line); + } // should we flush lines? - if (lines.size() >= endpoint.getGroupLines()) { - // spit out lines + if (!lines.isEmpty() && (lines.size() >= endpoint.getGroupLines() || last)) { + // spit out lines as we hit the size, or it was the last Exchange exchange = endpoint.createExchange(); // create message with the lines @@ -187,7 +193,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { getProcessor().process(exchange); } - } else { + } else if (!last) { // single line Exchange exchange = endpoint.createExchange(); http://git-wip-us.apache.org/repos/asf/camel/blob/ac70d071/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java index c8aac89..e194774 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java @@ -99,6 +99,7 @@ public class StreamProducer extends DefaultProducer { LOG.debug("About to write to file: {}", fileName); File f = new File(fileName); // will create a new file if missing or append to existing + f.getParentFile().mkdirs(); f.createNewFile(); return new FileOutputStream(f, true); } http://git-wip-us.apache.org/repos/asf/camel/blob/ac70d071/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java new file mode 100644 index 0000000..59ad264 --- /dev/null +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class StreamGroupLinesLastStrategyTest extends StreamGroupLinesStrategyTest { + + @Test + public void testGroupLines() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(2); + mock.setAssertPeriod(1000); + + assertMockEndpointsSatisfied(); + + Object result = mock.getExchanges().get(0).getIn().getBody(); + assertEquals("Get a wrong result.", "A\nB\nC\nD\n", result); + + // we did not have 4 lines but since its the last it was triggered anyway + Object result2 = mock.getExchanges().get(1).getIn().getBody(); + assertEquals("Get a wrong result.", "E\nF\n", result2); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("stream:file?fileName=target/stream/streamfile.txt&groupLines=4&groupStrategy=#myGroupStrategy").to("mock:result"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/ac70d071/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java index c4d000d..047319e 100644 --- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java @@ -48,16 +48,15 @@ public class StreamGroupLinesStrategyTest extends StreamGroupLinesTest { public void testGroupLines() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(2); + mock.setAssertPeriod(1000); assertMockEndpointsSatisfied(); Object result = mock.getExchanges().get(0).getIn().getBody(); assertEquals("Get a wrong result.", "A\nB\nC\n", result); - Object result2 = mock.getExchanges().get(1).getIn().getBody(); assertEquals("Get a wrong result.", "D\nE\nF\n", result2); - } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/ac70d071/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java index 63b9108..09d92a6 100644 --- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java @@ -59,6 +59,7 @@ public class StreamGroupLinesTest extends CamelTestSupport { public void testGroupLines() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(2); + mock.setAssertPeriod(1000); assertMockEndpointsSatisfied();
