This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b1006b4 [GOBBLIN-1337] Fix string representation of KafkaPartition in
KafkaTopicGroupingWorkUnitPacker[]
b1006b4 is described below
commit b1006b4da3f036cffdb81978faf3f26f3e577f76
Author: suvasude <[email protected]>
AuthorDate: Tue Dec 15 07:32:27 2020 -0800
[GOBBLIN-1337] Fix string representation of KafkaPartition in
KafkaTopicGroupingWorkUnitPacker[]
Closes #3174 from sv2000/kafkaBinPackerBug
---
.../extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index ba5b608..d380dcf 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -251,7 +251,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
List<WorkUnit> workUnits = entry.getValue();
for (WorkUnit workUnit : workUnits) {
int partitionId =
Integer.parseInt(workUnit.getProp(KafkaSource.PARTITION_ID));
- String topicPartition = new
KafkaPartition.Builder().withTopicName(topic).withId(partitionId).toString();
+ String topicPartition = new
KafkaPartition.Builder().withTopicName(topic).withId(partitionId).build().toString();
KafkaStreamingExtractor.KafkaWatermark watermark =
lastCommittedWatermarks.get(topicPartition);
workUnit.setProp(PARTITION_WATERMARK, GSON.toJson(watermark));
workUnit.setProp(PACKING_START_TIME_MILLIS,
this.packingStartTimeMillis);