Updated Branches:
  refs/heads/camel-2.11.x 1416ec7e8 -> 96c1ca4ea
  refs/heads/camel-2.12.x 8feba8474 -> 797399877
  refs/heads/master ceb28aea5 -> 23854b2d5


CAMEL-6899: stream consumer with groupLines should send last message when EOL 
but the groupLines limit was not hit, otherwise the last group is never sent 
out.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f07f04f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f07f04f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f07f04f

Branch: refs/heads/master
Commit: 3f07f04fa8da745686fedc3757521e223dd87dc1
Parents: ceb28ae
Author: Claus Ibsen <[email protected]>
Authored: Tue Nov 12 10:37:07 2013 +0100
Committer: Claus Ibsen <[email protected]>
Committed: Tue Nov 12 10:37:07 2013 +0100

----------------------------------------------------------------------
 .../camel/component/stream/StreamConsumer.java  | 14 ++++--
 .../camel/component/stream/StreamProducer.java  |  1 +
 .../StreamGroupLinesLastStrategyTest.java       | 51 ++++++++++++++++++++
 .../stream/StreamGroupLinesStrategyTest.java    |  3 +-
 .../component/stream/StreamGroupLinesTest.java  |  1 +
 5 files changed, 64 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3f07f04f/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index b05ab0b..1e8f9a9 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -159,6 +159,8 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
                     processLine(line);
                 }
             }
+            // EOL so trigger any
+            processLine(null);
         }
         // important: do not close the reader as it will close the standard 
system.in etc.
     }
@@ -167,13 +169,17 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
      * Strategy method for processing the line
      */
     protected synchronized void processLine(String line) throws Exception {
+        boolean last = line == null;
+
         if (endpoint.getGroupLines() > 0) {
             // remember line
-            lines.add(line);
+            if (line != null) {
+                lines.add(line);
+            }
 
             // should we flush lines?
-            if (lines.size() >= endpoint.getGroupLines()) {
-                // spit out lines
+            if (!lines.isEmpty() && (lines.size() >= endpoint.getGroupLines() 
|| last)) {
+                // spit out lines as we hit the size, or it was the last
                 Exchange exchange = endpoint.createExchange();
 
                 // create message with the lines
@@ -187,7 +193,7 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
 
                 getProcessor().process(exchange);
             }
-        } else {
+        } else if (!last) {
             // single line
             Exchange exchange = endpoint.createExchange();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3f07f04f/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
index c8aac89..e194774 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
@@ -99,6 +99,7 @@ public class StreamProducer extends DefaultProducer {
         LOG.debug("About to write to file: {}", fileName);
         File f = new File(fileName);
         // will create a new file if missing or append to existing
+        f.getParentFile().mkdirs();
         f.createNewFile();
         return new FileOutputStream(f, true);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f07f04f/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
 
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
new file mode 100644
index 0000000..59ad264
--- /dev/null
+++ 
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class StreamGroupLinesLastStrategyTest extends 
StreamGroupLinesStrategyTest {
+    
+    @Test
+    public void testGroupLines() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        mock.setAssertPeriod(1000);
+
+        assertMockEndpointsSatisfied();
+
+        Object result = mock.getExchanges().get(0).getIn().getBody();
+        assertEquals("Get a wrong result.", "A\nB\nC\nD\n", result);
+
+        // we did not have 4 lines but since its the last it was triggered 
anyway
+        Object result2 = mock.getExchanges().get(1).getIn().getBody();
+        assertEquals("Get a wrong result.", "E\nF\n", result2);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("stream:file?fileName=target/stream/streamfile.txt&groupLines=4&groupStrategy=#myGroupStrategy").to("mock:result");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3f07f04f/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
 
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
index c4d000d..047319e 100644
--- 
a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
+++ 
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
@@ -48,16 +48,15 @@ public class StreamGroupLinesStrategyTest extends 
StreamGroupLinesTest {
     public void testGroupLines() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(2);
+        mock.setAssertPeriod(1000);
 
         assertMockEndpointsSatisfied();
 
         Object result = mock.getExchanges().get(0).getIn().getBody();
         assertEquals("Get a wrong result.", "A\nB\nC\n", result);
-        
 
         Object result2 = mock.getExchanges().get(1).getIn().getBody();
         assertEquals("Get a wrong result.", "D\nE\nF\n", result2);
-        
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/3f07f04f/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
 
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
index 63b9108..09d92a6 100644
--- 
a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
+++ 
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
@@ -59,6 +59,7 @@ public class StreamGroupLinesTest extends CamelTestSupport {
     public void testGroupLines() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(2);
+        mock.setAssertPeriod(1000);
 
         assertMockEndpointsSatisfied();
 

Reply via email to