Repository: flume
Updated Branches:
  refs/heads/trunk 03c8357df -> c718dae09


FLUME-3043. Fix NPE in Kafka Sink and Channel

When logging level is set to DEBUG, Kafka Sink and Kafka Channel may throw a 
NullPointerException.

This patch ensures that `metadata` is not null to avoid the exception.

This closes #125

Reviewers: Denes Arvay, Bessenyei Balázs Donát

(loleek via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c718dae0
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c718dae0
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c718dae0

Branch: refs/heads/trunk
Commit: c718dae09d10db640cb9eb59f8abb11bd385a799
Parents: 03c8357
Author: dengkai02 <[email protected]>
Authored: Sun Apr 16 21:17:41 2017 +0000
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Sun Apr 16 21:22:43 2017 +0000

----------------------------------------------------------------------
 .../main/java/org/apache/flume/channel/kafka/KafkaChannel.java  | 4 +++-
 .../src/main/java/org/apache/flume/sink/kafka/KafkaSink.java    | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c718dae0/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 6684bea..5bd9be0 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -756,8 +756,10 @@ class ChannelCallback implements Callback {
     }
     if (log.isDebugEnabled()) {
       long batchElapsedTime = System.currentTimeMillis() - startTime;
-      log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" +
+      if (metadata != null) {
+        log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" +
                 metadata.partition() + "-" + metadata.offset() + "-" + 
batchElapsedTime);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c718dae0/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 241e900..68866c3 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -453,7 +453,10 @@ class SinkCallback implements Callback {
 
     if (logger.isDebugEnabled()) {
       long eventElapsedTime = System.currentTimeMillis() - startTime;
-      logger.debug("Acked message partition:{} ofset:{}",  
metadata.partition(), metadata.offset());
+      if (metadata != null) {
+        logger.debug("Acked message partition:{} ofset:{}", 
metadata.partition(),
+                metadata.offset());
+      }
       logger.debug("Elapsed time for send: {}", eventElapsedTime);
     }
   }

Reply via email to