Repository: camel Updated Branches: refs/heads/camel-2.14.x 08eb8bf61 -> b552ceac1
[CAMEL-8434] Fix infinite read from empty file in hdfs (cherry picked from commit c1a74982c6e7d3ab05f40da320099a30d8f26d33) Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b552ceac Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b552ceac Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b552ceac Branch: refs/heads/camel-2.14.x Commit: b552ceac1c3353e59b1ce261222ac6e91abddd1b Parents: 08eb8bf Author: Grzegorz Grzybek <[email protected]> Authored: Thu Mar 5 19:31:00 2015 +0100 Committer: Grzegorz Grzybek <[email protected]> Committed: Thu Mar 5 19:31:52 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/camel/component/hdfs/HdfsInputStream.java | 2 +- .../java/org/apache/camel/component/hdfs/HdfsConsumerTest.java | 2 ++ .../java/org/apache/camel/component/hdfs2/HdfsInputStream.java | 2 +- .../java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java | 2 ++ 4 files changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b552ceac/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java index e0b1562..4273dc5 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java @@ -69,7 +69,7 @@ public class HdfsInputStream implements Closeable { long nb = fileType.next(this, key, value); // when zero bytes was read from given type of file, we may still have a record (e.g., empty file) // null value.value is the only indication that no (new) record/chunk was read - if (nb == 0 && numOfReadBytes.get() > 0) { + if (nb == 0 && numOfReadMessages.get() > 0) { // we've read all chunks from file, which size is exact multiple the chunk size return -1; } http://git-wip-us.apache.org/repos/asf/camel/blob/b552ceac/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java index 9628dd1..92879d3 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java @@ -178,6 +178,8 @@ public class HdfsConsumerTest extends HdfsTestSupport { }); context.start(); + Thread.sleep(2000); + resultEndpoint.assertIsSatisfied(); assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(0)); } http://git-wip-us.apache.org/repos/asf/camel/blob/b552ceac/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java index b38beaf..70794e9 100644 --- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java +++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java @@ -75,7 +75,7 @@ public class HdfsInputStream implements Closeable { long nb = fileType.next(this, key, value); // when zero bytes was read from given type of file, we may still have a record (e.g., empty file) // null value.value is the only indication that no (new) record/chunk was read - if (nb == 0 && numOfReadBytes.get() > 0) { + if (nb == 0 && numOfReadMessages.get() > 0) { // we've read all chunks from file, which size is exact multiple the chunk size return -1; } http://git-wip-us.apache.org/repos/asf/camel/blob/b552ceac/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java index 6db80e6..c9cbbaf 100644 --- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java +++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java @@ -179,6 +179,8 @@ public class HdfsConsumerTest extends HdfsTestSupport { }); context.start(); + Thread.sleep(2000); + resultEndpoint.assertIsSatisfied(); assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(0)); }
