This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 025655f  [GOBBLIN-1229] Make topic specific state available to Kafka 
workunit packer[]
025655f is described below

commit 025655f7b52a87126640c6f14c39e3f501310ac4
Author: sv2000 <[email protected]>
AuthorDate: Wed Aug 12 15:58:21 2020 -0700

    [GOBBLIN-1229] Make topic specific state available to Kafka workunit 
packer[]
    
    Closes #3075 from sv2000/datasetAwareBinPacking
---
 .../source/extractor/extract/kafka/KafkaSource.java     | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 6a30756..6e5927b 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -102,6 +102,9 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   public static final Extract.TableType DEFAULT_TABLE_TYPE = 
Extract.TableType.APPEND_ONLY;
   public static final String DEFAULT_NAMESPACE_NAME = "KAFKA";
   public static final String ALL_TOPICS = "all";
+  //A workunit property that contains the number of topic partitions for a 
given topic. Useful for
+  //workunit size estimation to assign weights to a given topic partition.
+  public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
   public static final String AVG_RECORD_SIZE = "avg.record.size";
   public static final String AVG_RECORD_MILLIS = "avg.record.millis";
   public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
@@ -277,8 +280,8 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
         numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
         numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
       }
+      addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
       List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, 
numOfMultiWorkunits);
-      addTopicSpecificPropsToWorkUnits(workUnitList, topicSpecificStateMap);
       setLimiterReportKeyListToWorkUnits(workUnitList, 
getLimiterExtractorReportKeys());
       return workUnitList;
     } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
@@ -311,9 +314,11 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
     }
   }
 
-  private void addTopicSpecificPropsToWorkUnits(List<WorkUnit> workUnits, 
Map<String, State> topicSpecificStateMap) {
-    for (WorkUnit workUnit : workUnits) {
-      addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);
+  private void addTopicSpecificPropsToWorkUnits(Map<String, List<WorkUnit>> 
workUnits, Map<String, State> topicSpecificStateMap) {
+    for (List<WorkUnit> workUnitList : workUnits.values()) {
+      for (WorkUnit workUnit : workUnitList) {
+        addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);
+      }
     }
   }
 
@@ -380,7 +385,8 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
     context.close();
 
     List<WorkUnit> workUnits = Lists.newArrayList();
-    for (KafkaPartition partition : topic.getPartitions()) {
+    List<KafkaPartition> topicPartitions = topic.getPartitions();
+    for (KafkaPartition partition : topicPartitions) {
       WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, 
topicSpecificState);
       if (workUnit != null) {
         // For disqualified topics, for each of its workunits set the high 
watermark to be the same
@@ -388,6 +394,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
         if (!topicQualified) {
           skipWorkUnit(workUnit);
         }
+        workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
         workUnits.add(workUnit);
       }
     }

Reply via email to