Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157537266 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java --- @@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, - nextCommitMsg.getMetadata(Thread.currentThread())); + try { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, + JSON_MAPPER.writeValueAsString(new KafkaSpout.Info(Thread.currentThread(), context))); --- End diff -- Will do.
---