This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 83c325d3b [GOBBLIN-2068]Make offset range in Gobblin Metadata pipeline
configurable (#3949)
83c325d3b is described below
commit 83c325d3bdb7ae29139171446bb43075878ff266
Author: Zihan Li <[email protected]>
AuthorDate: Wed May 15 17:20:38 2024 -0700
[GOBBLIN-2068]Make offset range in Gobblin Metadata pipeline configurable
(#3949)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-2068]Make offset range in Gobblin Metadata pipeline configurable
* address comments
* change variable name
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../gobblin/iceberg/publisher/GobblinMCEPublisher.java | 8 +++++---
.../source/extractor/extract/kafka/KafkaUtils.java | 16 +++++++++++-----
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 49979b4ca..774d89bf9 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -115,9 +115,11 @@ public class GobblinMCEPublisher extends DataPublisher {
}
protected Map<String, String> getPartitionOffsetRange(String offsetKey) {
- return state.getPropAsList(offsetKey)
- .stream()
- .collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s ->
s.split(MAP_DELIMITER_KEY)[1]));
+ if (state.contains(offsetKey)) {
+ return state.getPropAsList(offsetKey).stream()
+ .collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s ->
s.split(MAP_DELIMITER_KEY)[1]));
+ }
+ return null;
}
/**
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index 4dc41ef57..cf2ed77dc 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -89,11 +89,17 @@ public class KafkaUtils {
if
(!state.contains(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i)))
{
break;
}
- KafkaPartition partition = new
KafkaPartition.Builder().withTopicName(topicName)
-
.withId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
i)))
-
.withLeaderId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID,
i)))
-
.withLeaderHostAndPort(state.getProp(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_HOSTANDPORT,
i)))
- .build();
+ KafkaPartition.Builder builder = new
KafkaPartition.Builder().withTopicName(topicName)
+
.withId(state.getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
i)));
+ String partitionLeaderIdProperName =
KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i);
+ String partitionLeaderHostPortProperName =
KafkaUtils.getPartitionPropName(KafkaSource.LEADER_HOSTANDPORT, i);
+ if (state.contains(partitionLeaderIdProperName)) {
+ builder =
builder.withLeaderId(state.getPropAsInt(partitionLeaderIdProperName));
+ }
+ if (state.contains(partitionLeaderHostPortProperName)) {
+ builder =
builder.withLeaderHostAndPort(state.getProp(partitionLeaderHostPortProperName));
+ }
+ KafkaPartition partition = builder.build();
partitions.add(partition);
}
if (partitions.isEmpty()) {