Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157362347
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
---
@@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() {
OffsetAndMetadata nextCommitOffsetAndMetadata = null;
if (found) {
- nextCommitOffsetAndMetadata = new
OffsetAndMetadata(nextCommitOffset,
- nextCommitMsg.getMetadata(Thread.currentThread()));
+ try {
+ nextCommitOffsetAndMetadata = new
OffsetAndMetadata(nextCommitOffset,
+ JSON_MAPPER.writeValueAsString(new
KafkaSpout.Info(Thread.currentThread(), context)));
--- End diff --
Consider moving the JSON serialization into KafkaSpout commitOffsets method
instead, then you can pass it as a parameter to this method and we can get away
with having only one ObjectMapper. Currently we can just create the info once
(e.g. in open) and reuse rather than creating and serializing a new one for
each commit and partition.
---