This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e501b62ffe [GOBBLIN-2118]Reduce no of network calls while fetching
kafka offsets during startup (#4009)
e501b62ffe is described below
commit e501b62ffe646885d8b33198737bc16d38c04ba3
Author: Arpit Varshney <[email protected]>
AuthorDate: Wed Aug 7 00:50:31 2024 +0530
[GOBBLIN-2118]Reduce no of network calls while fetching kafka offsets
during startup (#4009)
* Reduce no of network calls to kafka
* Add UT
* Added UTs
* Refactoring code
* Removing whitespaces
* Removing whitespaces
* Removing whitespaces
* Addressed review comments
* Changing contains to add for the failedOffsetsGetList
* Added null check
---------
Co-authored-by: Arpit Varshney <[email protected]>
---
.../kafka/client/GobblinKafkaConsumerClient.java | 19 ++++
.../extractor/extract/kafka/KafkaSource.java | 111 +++++++++++++++------
.../extractor/extract/kafka/KafkaSourceTest.java | 45 +++++++++
3 files changed, 143 insertions(+), 32 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index c75408e8f8..78534af827 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -66,6 +66,25 @@ public interface GobblinKafkaConsumerClient extends
Closeable {
*/
public long getEarliestOffset(KafkaPartition partition) throws
KafkaOffsetRetrievalFailureException;
+ /**
+ * Get the earliest available offset for a {@link Collection} of {@link
KafkaPartition}s. NOTE: The default implementation
+ * is not efficient i.e. it will make a getEarliest() call for every {@link
KafkaPartition}. Individual implementations
+ * of {@link GobblinKafkaConsumerClient} should override this method to use
more advanced APIs of the underlying KafkaConsumer
+ * to retrieve the latest offsets for a collection of partitions.
+ *
+ * @param partitions for which earliest offset is retrieved
+ *
+ * @throws KafkaOffsetRetrievalFailureException - If the underlying
kafka-client does not support getting the earliest offset
+ */
+ public default Map<KafkaPartition, Long> getEarliestOffsets(final
Collection<KafkaPartition> partitions)
+ throws KafkaOffsetRetrievalFailureException {
+ final Map<KafkaPartition, Long> offsetMap = Maps.newHashMap();
+ for (final KafkaPartition partition : partitions) {
+ offsetMap.put(partition, getEarliestOffset(partition));
+ }
+ return offsetMap;
+ }
+
/**
* Get the latest available offset for a <code>partition</code>
*
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 3eb0967658..d527759a70 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -236,9 +236,9 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
try {
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory =
kafkaConsumerClientResolver
- .resolveClass(
- state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
-
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
+ .resolveClass(state.getProp(
+ GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
+
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
@@ -440,29 +440,35 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
/*
* This function need to be thread safe since it is called in the Runnable
*/
- private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState
state,
+ public List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState
state,
Optional<State> topicSpecificState, Optional<Set<Integer>>
filteredPartitions) {
Timer.Context context =
this.metricContext.timer("isTopicQualifiedTimer").time();
boolean topicQualified = isTopicQualified(topic);
context.close();
- List<WorkUnit> workUnits = Lists.newArrayList();
- List<KafkaPartition> topicPartitions = topic.getPartitions();
- for (KafkaPartition partition : topicPartitions) {
- if(filteredPartitions.isPresent() &&
!filteredPartitions.get().contains(partition.getId())) {
- continue;
- }
- WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state,
topicSpecificState);
- if (workUnit != null) {
- // For disqualified topics, for each of its workunits set the high
watermark to be the same
- // as the low watermark, so that it will be skipped.
- if (!topicQualified) {
- skipWorkUnit(workUnit);
- }
- workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
- workUnits.add(workUnit);
- }
+ final List<WorkUnit> workUnits = Lists.newArrayList();
+ final List<KafkaPartition> topicPartitions = topic.getPartitions();
+ Map<KafkaPartition, WorkUnit> workUnitMap;
+
+ if (filteredPartitions.isPresent()) {
+ LOG.info("Filtered partitions for topic {} are {}", topic.getName(),
filteredPartitions.get());
+ final List<KafkaPartition> filteredPartitionsToBeProcessed =
topicPartitions.stream()
+ .filter(partition ->
filteredPartitions.get().contains(partition.getId()))
+ .collect(Collectors.toList());
+ workUnitMap = getWorkUnits(filteredPartitionsToBeProcessed, state,
topicSpecificState);
+ } else {
+ workUnitMap = getWorkUnits(topicPartitions, state, topicSpecificState);
+ }
+
+ if (!topicQualified) {
+ workUnitMap.values().forEach(KafkaSource::skipWorkUnit);
+ }
+
+ for (WorkUnit workUnit : workUnitMap.values()) {
+ workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
+ workUnits.add(workUnit);
}
+
this.partitionsToBeProcessed.addAll(topic.getPartitions());
return workUnits;
}
@@ -482,20 +488,61 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY,
workUnit.getLowWaterMark());
}
- private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition,
SourceState state,
+ /**
+ * Get the workunits of all the partitions passed, this method fetches all
the offsets for the partitions
+ * at once from kafka, and for each partiton creates a workunit.
+ * @param partitions
+ * @param state
+ * @param topicSpecificState
+ * @return
+ */
+ private Map<KafkaPartition, WorkUnit>
getWorkUnits(Collection<KafkaPartition> partitions, SourceState state,
Optional<State> topicSpecificState) {
- Offsets offsets = new Offsets();
-
- boolean failedToGetKafkaOffsets = false;
-
- try (Timer.Context context =
this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
- offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
-
offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
-
offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
- } catch (Throwable t) {
- failedToGetKafkaOffsets = true;
- LOG.error("Caught error in creating work unit for {}", partition, t);
+ final Map<KafkaPartition, Offsets> partitionOffsetMap = Maps.newHashMap();
+ final Set<KafkaPartition> failedOffsetsGetList = Sets.newHashSet();
+ try (final Timer.Context context =
this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
+ // Fetch the offsets for all the partitions at once
+ final Map<KafkaPartition, Long> earliestOffsetMap =
this.kafkaConsumerClient.get().getEarliestOffsets(partitions);
+ final Map<KafkaPartition, Long> latestOffsetMap =
this.kafkaConsumerClient.get().getLatestOffsets(partitions);
+ for (KafkaPartition partition : partitions) {
+ final Offsets offsets = new Offsets();
+ offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
+ // Check if both earliest and latest offset are fetched for the
partition, then set the offsets
+ if (earliestOffsetMap.containsKey(partition) &&
latestOffsetMap.containsKey(partition)) {
+ offsets.setEarliestOffset(earliestOffsetMap.get(partition));
+ offsets.setLatestOffset(latestOffsetMap.get(partition));
+ offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
+ partitionOffsetMap.put(partition, offsets);
+ // If either is not available, put it in the failed offsets list
+ } else {
+ failedOffsetsGetList.add(partition);
+ }
+ }
+ LOG.info("Time taken to fetch offset for partitions {} is {} ms",
partitions,
+ TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ } catch (KafkaOffsetRetrievalFailureException e) {
+ // When exception occurred while fetching earliest or latest offset for
all the partitions,
+ // add all the partitions to fetchOffsetsFailedPartitions
+ failedOffsetsGetList.addAll(partitions);
+ LOG.error("Caught error in creating work unit for {}", partitions, e);
}
+ if (!failedOffsetsGetList.isEmpty()) {
+ LOG.error("Failed to fetch offsets for partitions {}",
failedOffsetsGetList);
+ }
+ final Map<KafkaPartition, WorkUnit> workUnitMap = Maps.newHashMap();
+ for (Map.Entry<KafkaPartition, Offsets> partitionOffset :
partitionOffsetMap.entrySet()) {
+ WorkUnit workUnit =
+ getWorkUnitForTopicPartition(partitionOffset.getKey(), state,
topicSpecificState, partitionOffset.getValue(),
+ failedOffsetsGetList.contains(partitionOffset.getKey()));
+ if (workUnit != null) {
+ workUnitMap.put(partitionOffset.getKey(), workUnit);
+ }
+ }
+ return workUnitMap;
+ }
+
+ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition,
SourceState state,
+ Optional<State> topicSpecificState, Offsets offsets, boolean
failedToGetKafkaOffsets) {
long previousOffset = 0;
long previousOffsetFetchEpochTime = 0;
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
index fff8581fc0..8353f86824 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
@@ -22,6 +22,7 @@ import com.typesafe.config.Config;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -71,6 +72,40 @@ public class KafkaSourceTest {
}
+ @Test
+ public void testGetWorkunitsForTopic() {
+ TestKafkaClient testKafkaClient = new TestKafkaClient();
+ testKafkaClient.testTopics = testTopics;
+ SourceState state = new SourceState();
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, "TestPath");
+ state.setProp(KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE,
KafkaWorkUnitPacker.PackerType.CUSTOM);
+ state.setProp(KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE,
+
"org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker");
+ state.setProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
"MockTestKafkaConsumerClientFactory");
+ List<KafkaTopic> kafkaTopicList = toKafkaTopicList(testTopics);
+ TestKafkaSource testKafkaSource = new TestKafkaSource(testKafkaClient);
+
+ List<String> allTopics = testTopics;
+ Map<String, List<Integer>> filteredTopicPartitionMap = new HashMap<>();
+ filteredTopicPartitionMap.put(allTopics.get(0), new LinkedList<>());
+ filteredTopicPartitionMap.put(allTopics.get(1), new LinkedList<>());
+ filteredTopicPartitionMap.put(allTopics.get(2), new LinkedList<>());
+ filteredTopicPartitionMap.get(allTopics.get(0)).addAll(Arrays.asList(0,
11));
+ filteredTopicPartitionMap.get(allTopics.get(1)).addAll(Arrays.asList(2, 8,
10));
+ filteredTopicPartitionMap.get(allTopics.get(2)).addAll(Arrays.asList(1, 3,
5, 7));
+ testKafkaSource.getWorkunitsForFilteredPartitions(state,
Optional.of(filteredTopicPartitionMap), Optional.of(3));
+
+ for (KafkaTopic topic : kafkaTopicList) {
+ List<WorkUnit> rawWorkunitList =
+ testKafkaSource.getWorkUnitsForTopic(topic, state,
Optional.absent(), Optional.absent());
+ // verify if the latest offset has been taken via
testKafkaClient.getLatestOffsets()
+ for (WorkUnit workUnit : rawWorkunitList) {
+ Assert.assertEquals(
+
Integer.valueOf(workUnit.getProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY)).intValue(),
20);
+ }
+ }
+ }
+
@Test
public void testGetWorkunitsForFilteredPartitions() {
TestKafkaClient testKafkaClient = new TestKafkaClient();
@@ -205,6 +240,16 @@ public class KafkaSourceTest {
return toKafkaTopicList(DatasetFilterUtils.filter(testTopics, blacklist,
whitelist));
}
+ @Override
+ public Map<KafkaPartition, Long>
getEarliestOffsets(Collection<KafkaPartition> partitions) {
+ return partitions.stream().collect(Collectors.toMap(p -> p, p -> 10L));
+ }
+
+ @Override
+ public Map<KafkaPartition, Long>
getLatestOffsets(Collection<KafkaPartition> partitions) {
+ return partitions.stream().collect(Collectors.toMap(p -> p, p -> 20L));
+ }
+
@Override
public long getEarliestOffset(KafkaPartition partition)
throws KafkaOffsetRetrievalFailureException {