added comment on ignore exception.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e9cde24 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e9cde24 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e9cde24 Branch: refs/heads/master Commit: 2e9cde243ba2841f4f9a2864d2a9292d0f672814 Parents: 9396c5d Author: Sela <[email protected]> Authored: Sat Jan 21 17:01:14 2017 +0200 Committer: Sela <[email protected]> Committed: Sat Jan 21 17:01:14 2017 +0200 ---------------------------------------------------------------------- .../kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2e9cde24/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 2de2174..36ab1fd 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1058,11 +1058,12 @@ public class KafkaIO { long offset = offsetConsumer.position(p.topicPartition); p.setLatestOffset(offset); } catch (Exception e) { + // An exception is expected if we've closed the reader in another thread. Ignore and exit. if (closed.get()) { break; } LOG.warn("{}: exception while fetching latest offset for partition {}. will be retried.", - this, p.topicPartition, e); + this, p.topicPartition, e); p.setLatestOffset(UNINITIALIZED_OFFSET); // reset }
