Repository: flume Updated Branches: refs/heads/trunk 03c8357df -> c718dae09
FLUME-3043. Fix NPE in Kafka Sink and Channel When logging level is set to DEBUG, Kafka Sink and Kafka Channel may throw a NullPointerException. This patch ensures that `metadata` is not null to avoid the exception. This closes #125 Reviewers: Denes Arvay, Bessenyei Balázs Donát (loleek via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c718dae0 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c718dae0 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c718dae0 Branch: refs/heads/trunk Commit: c718dae09d10db640cb9eb59f8abb11bd385a799 Parents: 03c8357 Author: dengkai02 <[email protected]> Authored: Sun Apr 16 21:17:41 2017 +0000 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Sun Apr 16 21:22:43 2017 +0000 ---------------------------------------------------------------------- .../main/java/org/apache/flume/channel/kafka/KafkaChannel.java | 4 +++- .../src/main/java/org/apache/flume/sink/kafka/KafkaSink.java | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c718dae0/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 6684bea..5bd9be0 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -756,8 +756,10 @@ class ChannelCallback implements Callback { } if (log.isDebugEnabled()) { long batchElapsedTime = System.currentTimeMillis() - startTime; - log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + + if (metadata != null) { + log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset() + "-" + batchElapsedTime); + } } } } http://git-wip-us.apache.org/repos/asf/flume/blob/c718dae0/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index 241e900..68866c3 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -453,7 +453,10 @@ class SinkCallback implements Callback { if (logger.isDebugEnabled()) { long eventElapsedTime = System.currentTimeMillis() - startTime; - logger.debug("Acked message partition:{} ofset:{}", metadata.partition(), metadata.offset()); + if (metadata != null) { + logger.debug("Acked message partition:{} ofset:{}", metadata.partition(), + metadata.offset()); + } logger.debug("Elapsed time for send: {}", eventElapsedTime); } }
