This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e501b62ffe [GOBBLIN-2118]Reduce no of network calls while fetching 
kafka offsets during startup (#4009)
e501b62ffe is described below

commit e501b62ffe646885d8b33198737bc16d38c04ba3
Author: Arpit Varshney <[email protected]>
AuthorDate: Wed Aug 7 00:50:31 2024 +0530

    [GOBBLIN-2118]Reduce no of network calls while fetching kafka offsets 
during startup (#4009)
    
    * Reduce no of network calls to kafka
    
    * Add UT
    
    * Added UTs
    
    * Refactoring code
    
    * Removing whitespaces
    
    * Removing whitespaces
    
    * Removing whitespaces
    
    * Addressed review comments
    
    * Changing contains to add for the failedOffsetsGetList
    
    * Added null check
    
    ---------
    
    Co-authored-by: Arpit Varshney <[email protected]>
---
 .../kafka/client/GobblinKafkaConsumerClient.java   |  19 ++++
 .../extractor/extract/kafka/KafkaSource.java       | 111 +++++++++++++++------
 .../extractor/extract/kafka/KafkaSourceTest.java   |  45 +++++++++
 3 files changed, 143 insertions(+), 32 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
index c75408e8f8..78534af827 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java
@@ -66,6 +66,25 @@ public interface GobblinKafkaConsumerClient extends 
Closeable {
    */
   public long getEarliestOffset(KafkaPartition partition) throws 
KafkaOffsetRetrievalFailureException;
 
+  /**
+   * Get the earliest available offset for a {@link Collection} of {@link 
KafkaPartition}s. NOTE: The default implementation
+   * is not efficient i.e. it will make a getEarliest() call for every {@link 
KafkaPartition}. Individual implementations
+   * of {@link GobblinKafkaConsumerClient} should override this method to use 
more advanced APIs of the underlying KafkaConsumer
+   * to retrieve the latest offsets for a collection of partitions.
+   *
+   * @param partitions for which earliest offset is retrieved
+   *
+   * @throws KafkaOffsetRetrievalFailureException - If the underlying 
kafka-client does not support getting the earliest offset
+   */
+  public default Map<KafkaPartition, Long> getEarliestOffsets(final 
Collection<KafkaPartition> partitions)
+      throws KafkaOffsetRetrievalFailureException {
+    final Map<KafkaPartition, Long> offsetMap = Maps.newHashMap();
+    for (final KafkaPartition partition : partitions) {
+      offsetMap.put(partition, getEarliestOffset(partition));
+    }
+    return offsetMap;
+  }
+
   /**
    * Get the latest available offset for a <code>partition</code>
    *
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 3eb0967658..d527759a70 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -236,9 +236,9 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
     try {
       Config config = ConfigUtils.propertiesToConfig(state.getProperties());
       GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = 
kafkaConsumerClientResolver
-          .resolveClass(
-              state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
-                  
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
+          .resolveClass(state.getProp(
+              GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
+              
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
 
       this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
 
@@ -440,29 +440,35 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   /*
    * This function need to be thread safe since it is called in the Runnable
    */
-  private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState 
state,
+  public List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState 
state,
       Optional<State> topicSpecificState, Optional<Set<Integer>> 
filteredPartitions) {
     Timer.Context context = 
this.metricContext.timer("isTopicQualifiedTimer").time();
     boolean topicQualified = isTopicQualified(topic);
     context.close();
 
-    List<WorkUnit> workUnits = Lists.newArrayList();
-    List<KafkaPartition> topicPartitions = topic.getPartitions();
-    for (KafkaPartition partition : topicPartitions) {
-      if(filteredPartitions.isPresent() && 
!filteredPartitions.get().contains(partition.getId())) {
-        continue;
-      }
-      WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, 
topicSpecificState);
-      if (workUnit != null) {
-        // For disqualified topics, for each of its workunits set the high 
watermark to be the same
-        // as the low watermark, so that it will be skipped.
-        if (!topicQualified) {
-          skipWorkUnit(workUnit);
-        }
-        workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
-        workUnits.add(workUnit);
-      }
+    final List<WorkUnit> workUnits = Lists.newArrayList();
+    final List<KafkaPartition> topicPartitions = topic.getPartitions();
+    Map<KafkaPartition, WorkUnit> workUnitMap;
+
+    if (filteredPartitions.isPresent()) {
+      LOG.info("Filtered partitions for topic {} are {}", topic.getName(), 
filteredPartitions.get());
+      final List<KafkaPartition> filteredPartitionsToBeProcessed = 
topicPartitions.stream()
+          .filter(partition -> 
filteredPartitions.get().contains(partition.getId()))
+          .collect(Collectors.toList());
+      workUnitMap = getWorkUnits(filteredPartitionsToBeProcessed, state, 
topicSpecificState);
+    } else {
+      workUnitMap = getWorkUnits(topicPartitions, state, topicSpecificState);
+    }
+
+    if (!topicQualified) {
+      workUnitMap.values().forEach(KafkaSource::skipWorkUnit);
+    }
+
+    for (WorkUnit workUnit : workUnitMap.values()) {
+      workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
+      workUnits.add(workUnit);
     }
+
     this.partitionsToBeProcessed.addAll(topic.getPartitions());
     return workUnits;
   }
@@ -482,20 +488,61 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, 
workUnit.getLowWaterMark());
   }
 
-  private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, 
SourceState state,
+  /**
+   * Get the workunits of all the partitions passed, this method fetches all 
the offsets for the partitions
+   * at once from kafka, and for each partiton creates a workunit.
+   * @param partitions
+   * @param state
+   * @param topicSpecificState
+   * @return
+   */
+  private Map<KafkaPartition, WorkUnit> 
getWorkUnits(Collection<KafkaPartition> partitions, SourceState state,
       Optional<State> topicSpecificState) {
-    Offsets offsets = new Offsets();
-
-    boolean failedToGetKafkaOffsets = false;
-
-    try (Timer.Context context = 
this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
-      offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
-      
offsets.setEarliestOffset(this.kafkaConsumerClient.get().getEarliestOffset(partition));
-      
offsets.setLatestOffset(this.kafkaConsumerClient.get().getLatestOffset(partition));
-    } catch (Throwable t) {
-      failedToGetKafkaOffsets = true;
-      LOG.error("Caught error in creating work unit for {}", partition, t);
+    final Map<KafkaPartition, Offsets> partitionOffsetMap = Maps.newHashMap();
+    final Set<KafkaPartition> failedOffsetsGetList = Sets.newHashSet();
+    try (final Timer.Context context = 
this.metricContext.timer(OFFSET_FETCH_TIMER).time()) {
+      // Fetch the offsets for all the partitions at once
+      final Map<KafkaPartition, Long> earliestOffsetMap = 
this.kafkaConsumerClient.get().getEarliestOffsets(partitions);
+      final Map<KafkaPartition, Long> latestOffsetMap = 
this.kafkaConsumerClient.get().getLatestOffsets(partitions);
+      for (KafkaPartition partition : partitions) {
+        final Offsets offsets = new Offsets();
+        offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
+        // Check if both earliest and latest offset are fetched for the 
partition, then set the offsets
+        if (earliestOffsetMap.containsKey(partition) && 
latestOffsetMap.containsKey(partition)) {
+          offsets.setEarliestOffset(earliestOffsetMap.get(partition));
+          offsets.setLatestOffset(latestOffsetMap.get(partition));
+          offsets.setOffsetFetchEpochTime(System.currentTimeMillis());
+          partitionOffsetMap.put(partition, offsets);
+          // If either is not available, put it in the failed offsets list
+        } else {
+          failedOffsetsGetList.add(partition);
+        }
+      }
+      LOG.info("Time taken to fetch offset for partitions {} is {} ms", 
partitions,
+          TimeUnit.NANOSECONDS.toMillis(context.stop()));
+    } catch (KafkaOffsetRetrievalFailureException e) {
+      // When exception occurred  while fetching earliest or latest offset for 
all the partitions,
+      // add all the partitions to fetchOffsetsFailedPartitions
+      failedOffsetsGetList.addAll(partitions);
+      LOG.error("Caught error in creating work unit for {}", partitions, e);
     }
+    if (!failedOffsetsGetList.isEmpty()) {
+      LOG.error("Failed to fetch offsets for partitions {}", 
failedOffsetsGetList);
+    }
+    final Map<KafkaPartition, WorkUnit> workUnitMap = Maps.newHashMap();
+    for (Map.Entry<KafkaPartition, Offsets> partitionOffset : 
partitionOffsetMap.entrySet()) {
+      WorkUnit workUnit =
+          getWorkUnitForTopicPartition(partitionOffset.getKey(), state, 
topicSpecificState, partitionOffset.getValue(),
+              failedOffsetsGetList.contains(partitionOffset.getKey()));
+      if (workUnit != null) {
+        workUnitMap.put(partitionOffset.getKey(), workUnit);
+      }
+    }
+    return workUnitMap;
+  }
+
+  private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, 
SourceState state,
+      Optional<State> topicSpecificState, Offsets offsets, boolean 
failedToGetKafkaOffsets) {
 
     long previousOffset = 0;
     long previousOffsetFetchEpochTime = 0;
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
index fff8581fc0..8353f86824 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
@@ -22,6 +22,7 @@ import com.typesafe.config.Config;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -71,6 +72,40 @@ public class KafkaSourceTest {
 
   }
 
+  @Test
+  public void testGetWorkunitsForTopic() {
+    TestKafkaClient testKafkaClient = new TestKafkaClient();
+    testKafkaClient.testTopics = testTopics;
+    SourceState state = new SourceState();
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, "TestPath");
+    state.setProp(KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE, 
KafkaWorkUnitPacker.PackerType.CUSTOM);
+    state.setProp(KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE,
+        
"org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker");
+    state.setProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, 
"MockTestKafkaConsumerClientFactory");
+    List<KafkaTopic> kafkaTopicList = toKafkaTopicList(testTopics);
+    TestKafkaSource testKafkaSource = new TestKafkaSource(testKafkaClient);
+
+    List<String> allTopics = testTopics;
+    Map<String, List<Integer>> filteredTopicPartitionMap = new HashMap<>();
+    filteredTopicPartitionMap.put(allTopics.get(0), new LinkedList<>());
+    filteredTopicPartitionMap.put(allTopics.get(1), new LinkedList<>());
+    filteredTopicPartitionMap.put(allTopics.get(2), new LinkedList<>());
+    filteredTopicPartitionMap.get(allTopics.get(0)).addAll(Arrays.asList(0, 
11));
+    filteredTopicPartitionMap.get(allTopics.get(1)).addAll(Arrays.asList(2, 8, 
10));
+    filteredTopicPartitionMap.get(allTopics.get(2)).addAll(Arrays.asList(1, 3, 
5, 7));
+    testKafkaSource.getWorkunitsForFilteredPartitions(state, 
Optional.of(filteredTopicPartitionMap), Optional.of(3));
+
+    for (KafkaTopic topic : kafkaTopicList) {
+      List<WorkUnit> rawWorkunitList =
+          testKafkaSource.getWorkUnitsForTopic(topic, state, 
Optional.absent(), Optional.absent());
+      // verify if the latest offset has been taken via 
testKafkaClient.getLatestOffsets()
+      for (WorkUnit workUnit : rawWorkunitList) {
+        Assert.assertEquals(
+            
Integer.valueOf(workUnit.getProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY)).intValue(),
 20);
+      }
+    }
+  }
+
   @Test
   public void testGetWorkunitsForFilteredPartitions() {
     TestKafkaClient testKafkaClient = new TestKafkaClient();
@@ -205,6 +240,16 @@ public class KafkaSourceTest {
       return toKafkaTopicList(DatasetFilterUtils.filter(testTopics, blacklist, 
whitelist));
     }
 
+    @Override
+    public Map<KafkaPartition, Long> 
getEarliestOffsets(Collection<KafkaPartition> partitions) {
+      return partitions.stream().collect(Collectors.toMap(p -> p, p -> 10L));
+    }
+
+    @Override
+    public Map<KafkaPartition, Long> 
getLatestOffsets(Collection<KafkaPartition> partitions) {
+      return partitions.stream().collect(Collectors.toMap(p -> p, p -> 20L));
+    }
+
     @Override
     public long getEarliestOffset(KafkaPartition partition)
         throws KafkaOffsetRetrievalFailureException {

Reply via email to