[
https://issues.apache.org/jira/browse/KAFKA-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317292#comment-16317292
]
ASF GitHub Bot commented on KAFKA-6422:
---------------------------------------
hachikuji closed pull request #4387: KAFKA-6422 Mirror maker will throw null
pointer exception when the message value is null
URL: https://github.com/apache/kafka/pull/4387
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 618fd2a95b9..907fe20f414 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -67,7 +67,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
private[tools] var producer: MirrorMakerProducer = null
private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
- private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
+ private val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)
// Track the messages not successfully sent by mirror maker.
private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
private var messageHandler: MirrorMakerMessageHandler = null
@@ -384,7 +384,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
def cleanShutdown() {
- if (isShuttingdown.compareAndSet(false, true)) {
+ if (isShuttingDown.compareAndSet(false, true)) {
info("Start clean shutdown.")
// Shutdown consumer threads.
info("Shutting down consumer threads.")
@@ -426,7 +426,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
try {
while (!exitingOnSendFailure && !shuttingDown &&
mirrorMakerConsumer.hasData) {
val data = mirrorMakerConsumer.receive()
- trace("Sending message with value size %d and offset
%d".format(data.value.length, data.offset))
+ if (data.value != null) {
+ trace("Sending message with value size %d and offset
%d.".format(data.value.length, data.offset))
+ } else {
+ trace("Sending message with null value and offset
%d.".format(data.offset))
+ }
val records = messageHandler.handle(data)
records.asScala.foreach(producer.send)
maybeFlushAndCommitOffsets()
@@ -459,7 +463,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
shutdownLatch.countDown()
info("Mirror maker thread stopped")
// if it exits accidentally, stop the entire mirror maker
- if (!isShuttingdown.get()) {
+ if (!isShuttingDown.get()) {
fatal("Mirror maker thread exited abnormally, stopping the whole
mirror maker.")
sys.exit(-1)
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> When enable trace level log in mirror maker, it will throw null pointer
> exception and the mirror maker will shutdown
> --------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-6422
> URL: https://issues.apache.org/jira/browse/KAFKA-6422
> Project: Kafka
> Issue Type: Bug
> Components: tools
> Affects Versions: 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 0.11.0.1,
> 0.11.0.2
> Reporter: Xin Li
> Assignee: Xin Li
> Priority: Minor
> Labels: easyfix
> Fix For: 0.11.0.0
>
>
> https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/tools/MirrorMaker.scala#L414
> when enable trace level log in mirror maker, if the message value is null, it
> will throw null pointer exception, and mirror maker will shutdown because of
> that.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)