This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 83c325d3b [GOBBLIN-2068]Make offset range in Gobblin Metadata pipeline 
configurable (#3949)
83c325d3b is described below

commit 83c325d3bdb7ae29139171446bb43075878ff266
Author: Zihan Li <[email protected]>
AuthorDate: Wed May 15 17:20:38 2024 -0700

    [GOBBLIN-2068]Make offset range in Gobblin Metadata pipeline configurable 
(#3949)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-2068]Make offset range in Gobblin Metadata pipeline configurable
    
    * address comments
    
    * change variable name
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../gobblin/iceberg/publisher/GobblinMCEPublisher.java   |  8 +++++---
 .../source/extractor/extract/kafka/KafkaUtils.java       | 16 +++++++++++-----
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 49979b4ca..774d89bf9 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -115,9 +115,11 @@ public class GobblinMCEPublisher extends DataPublisher {
   }
 
   protected Map<String, String> getPartitionOffsetRange(String offsetKey) {
-    return state.getPropAsList(offsetKey)
-        .stream()
-        .collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> 
s.split(MAP_DELIMITER_KEY)[1]));
+    if (state.contains(offsetKey)) {
+      return state.getPropAsList(offsetKey).stream()
+          .collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> 
s.split(MAP_DELIMITER_KEY)[1]));
+    }
+    return null;
   }
 
   /**
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index 4dc41ef57..cf2ed77dc 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -89,11 +89,17 @@ public class KafkaUtils {
       if 
(!state.contains(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i))) 
{
         break;
       }
-      KafkaPartition partition = new 
KafkaPartition.Builder().withTopicName(topicName)
-          
.withId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 i)))
-          
.withLeaderId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID,
 i)))
-          
.withLeaderHostAndPort(state.getProp(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_HOSTANDPORT,
 i)))
-          .build();
+      KafkaPartition.Builder builder = new 
KafkaPartition.Builder().withTopicName(topicName)
+          
.withId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 i)));
+      String partitionLeaderIdProperName = 
KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i);
+      String partitionLeaderHostPortProperName = 
KafkaUtils.getPartitionPropName(KafkaSource.LEADER_HOSTANDPORT, i);
+      if (state.contains(partitionLeaderIdProperName)) {
+        builder = 
builder.withLeaderId(state.getPropAsInt(partitionLeaderIdProperName));
+      }
+      if (state.contains(partitionLeaderHostPortProperName)) {
+        builder = 
builder.withLeaderHostAndPort(state.getProp(partitionLeaderHostPortProperName));
+      }
+      KafkaPartition partition = builder.build();
       partitions.add(partition);
     }
     if (partitions.isEmpty()) {

Reply via email to