This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b1006b4  [GOBBLIN-1337] Fix string representation of KafkaPartition in 
KafkaTopicGroupingWorkUnitPacker[]
b1006b4 is described below

commit b1006b4da3f036cffdb81978faf3f26f3e577f76
Author: suvasude <[email protected]>
AuthorDate: Tue Dec 15 07:32:27 2020 -0800

    [GOBBLIN-1337] Fix string representation of KafkaPartition in 
KafkaTopicGroupingWorkUnitPacker[]
    
    Closes #3174 from sv2000/kafkaBinPackerBug
---
 .../extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index ba5b608..d380dcf 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -251,7 +251,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
       List<WorkUnit> workUnits = entry.getValue();
       for (WorkUnit workUnit : workUnits) {
         int partitionId = 
Integer.parseInt(workUnit.getProp(KafkaSource.PARTITION_ID));
-        String topicPartition = new 
KafkaPartition.Builder().withTopicName(topic).withId(partitionId).toString();
+        String topicPartition = new 
KafkaPartition.Builder().withTopicName(topic).withId(partitionId).build().toString();
         KafkaStreamingExtractor.KafkaWatermark watermark = 
lastCommittedWatermarks.get(topicPartition);
         workUnit.setProp(PARTITION_WATERMARK, GSON.toJson(watermark));
         workUnit.setProp(PACKING_START_TIME_MILLIS, 
this.packingStartTimeMillis);

Reply via email to