CAMEL-6899: camel-stream has headers with index/complete flag so you know this information, such as its the last, and there was N number of lines.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/23854b2d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/23854b2d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/23854b2d Branch: refs/heads/master Commit: 23854b2d52c9c7f8ed5420e3e52dc3657c3400e2 Parents: 3f07f04 Author: Claus Ibsen <[email protected]> Authored: Tue Nov 12 11:01:33 2013 +0100 Committer: Claus Ibsen <[email protected]> Committed: Tue Nov 12 11:01:33 2013 +0100 ---------------------------------------------------------------------- .../camel/component/stream/StreamConstants.java | 26 +++++++++++ .../camel/component/stream/StreamConsumer.java | 45 ++++++++++---------- .../camel/component/stream/StreamEndpoint.java | 9 ++++ .../component/stream/ScanStreamFileTest.java | 5 +++ .../StreamGroupLinesLastStrategyTest.java | 4 ++ .../stream/StreamGroupLinesStrategyTest.java | 4 ++ .../component/stream/StreamGroupLinesTest.java | 4 ++ 7 files changed, 75 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/23854b2d/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConstants.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConstants.java new file mode 100644 index 0000000..b338c43 --- /dev/null +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConstants.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stream; + +public final class StreamConstants { + + public static final String STREAM_INDEX = "CamelStreamIndex"; + public static final String STREAM_COMPLETE = "CamelStreamComplete"; + + private StreamConstants() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/23854b2d/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java index 1e8f9a9..ed33a0d 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java @@ -32,10 +32,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; -import org.apache.camel.impl.DefaultMessage; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; @@ -122,6 +120,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { } private void readFromStream() throws Exception { + long index = 0; String line; BufferedReader br = initializeStream(); @@ -132,7 +131,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { LOG.trace("Read line: {}", line); boolean eos = line == null; if (!eos && isRunAllowed()) { - processLine(line); + index = processLine(line, false, index); } else if (eos && isRunAllowed() && endpoint.isRetry()) { //try and re-open stream br = initializeStream(); @@ -147,20 +146,29 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { } else { // regular read stream once until end of stream boolean eos = false; + String line2 = null; while (!eos && isRunAllowed()) { if (endpoint.getPromptMessage() != null) { doPromptMessage(); } - line = br.readLine(); + if (line2 == null) { + line = br.readLine(); + } else { + line = line2; + } LOG.trace("Read line: {}", line); + eos = line == null; if (!eos && isRunAllowed()) { - processLine(line); + // read ahead if there is more data + line2 = br.readLine(); + boolean last = line2 == null; + index = processLine(line, last, index); } } // EOL so trigger any - processLine(null); + processLine(null, true, index); } // important: do not close the reader as it will close the standard system.in etc. } @@ -168,9 +176,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { /** * Strategy method for processing the line */ - protected synchronized void processLine(String line) throws Exception { - boolean last = line == null; - + protected synchronized long processLine(String line, boolean last, long index) throws Exception { if (endpoint.getGroupLines() > 0) { // remember line if (line != null) { @@ -180,29 +186,24 @@ public class StreamConsumer extends DefaultConsumer implements Runnable { // should we flush lines? if (!lines.isEmpty() && (lines.size() >= endpoint.getGroupLines() || last)) { // spit out lines as we hit the size, or it was the last - Exchange exchange = endpoint.createExchange(); - - // create message with the lines - Message msg = new DefaultMessage(); List<String> copy = new ArrayList<String>(lines); - msg.setBody(endpoint.getGroupStrategy().groupLines(copy)); - exchange.setIn(msg); + Object body = endpoint.getGroupStrategy().groupLines(copy); + // remember to inc index when we create an exchange + Exchange exchange = endpoint.createExchange(body, index++, last); // clear lines lines.clear(); getProcessor().process(exchange); } - } else if (!last) { + } else if (line != null) { // single line - Exchange exchange = endpoint.createExchange(); - - Message msg = new DefaultMessage(); - msg.setBody(line); - exchange.setIn(msg); - + // remember to inc index when we create an exchange + Exchange exchange = endpoint.createExchange(line, index++, last); getProcessor().process(exchange); } + + return index; } /** http://git-wip-us.apache.org/repos/asf/camel/blob/23854b2d/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java index c091e65..9fc46f3 100644 --- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java +++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java @@ -20,6 +20,7 @@ import java.nio.charset.Charset; import org.apache.camel.Component; import org.apache.camel.Consumer; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; @@ -70,6 +71,14 @@ public class StreamEndpoint extends DefaultEndpoint { return true; } + protected Exchange createExchange(Object body, long index, boolean last) { + Exchange exchange = createExchange(); + exchange.getIn().setBody(body); + exchange.getIn().setHeader(StreamConstants.STREAM_INDEX, index); + exchange.getIn().setHeader(StreamConstants.STREAM_COMPLETE, last); + return exchange; + } + // Properties //------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/23854b2d/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java index 2f7e89b..60096f0 100644 --- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java @@ -48,6 +48,11 @@ public class ScanStreamFileTest extends CamelTestSupport { public void testScanFile() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMinimumMessageCount(2); + mock.message(0).header(StreamConstants.STREAM_INDEX).isEqualTo(0); + mock.message(0).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false); + mock.message(1).header(StreamConstants.STREAM_INDEX).isEqualTo(1); + // a scanStream=true is never finished + mock.message(1).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false); FileOutputStream fos = new FileOutputStream(file); try { http://git-wip-us.apache.org/repos/asf/camel/blob/23854b2d/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java index 59ad264..279bae0 100644 --- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java @@ -27,6 +27,10 @@ public class StreamGroupLinesLastStrategyTest extends StreamGroupLinesStrategyTe MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(2); mock.setAssertPeriod(1000); + mock.message(0).header(StreamConstants.STREAM_INDEX).isEqualTo(0); + mock.message(0).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false); + mock.message(1).header(StreamConstants.STREAM_INDEX).isEqualTo(1); + mock.message(1).header(StreamConstants.STREAM_COMPLETE).isEqualTo(true); assertMockEndpointsSatisfied(); http://git-wip-us.apache.org/repos/asf/camel/blob/23854b2d/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java index 047319e..4352e13 100644 --- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java @@ -49,6 +49,10 @@ public class StreamGroupLinesStrategyTest extends StreamGroupLinesTest { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(2); mock.setAssertPeriod(1000); + mock.message(0).header(StreamConstants.STREAM_INDEX).isEqualTo(0); + mock.message(0).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false); + mock.message(1).header(StreamConstants.STREAM_INDEX).isEqualTo(1); + mock.message(1).header(StreamConstants.STREAM_COMPLETE).isEqualTo(true); assertMockEndpointsSatisfied(); http://git-wip-us.apache.org/repos/asf/camel/blob/23854b2d/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java index 09d92a6..8fe19fc 100644 --- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java +++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java @@ -60,6 +60,10 @@ public class StreamGroupLinesTest extends CamelTestSupport { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(2); mock.setAssertPeriod(1000); + mock.message(0).header(StreamConstants.STREAM_INDEX).isEqualTo(0); + mock.message(0).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false); + mock.message(1).header(StreamConstants.STREAM_INDEX).isEqualTo(1); + mock.message(1).header(StreamConstants.STREAM_COMPLETE).isEqualTo(true); assertMockEndpointsSatisfied();
