This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 68e3a3b  [GOBBLIN-879] Refactor bin-packer for better code reuse
68e3a3b is described below

commit 68e3a3bca3017326d8521e80cef5ea420a3625f2
Author: autumnust <[email protected]>
AuthorDate: Sun Sep 15 17:01:12 2019 -0700

    [GOBBLIN-879] Refactor bin-packer for better code reuse
    
    Closes #2732 from autumnust/refactorForPacker
---
 .../kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java  |  8 --------
 .../extract/kafka/workunit/packer/KafkaWorkUnitPacker.java | 14 ++++++++++++--
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index f85fd97..85ba232 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -87,14 +87,6 @@ public class KafkaBiLevelWorkUnitPacker extends 
KafkaWorkUnitPacker {
     return worstFitDecreasingBinPacking(groups, numContainers);
   }
 
-  private static double calcTotalEstSizeForTopic(List<WorkUnit> 
workUnitsForTopic) {
-    double totalSize = 0;
-    for (WorkUnit w : workUnitsForTopic) {
-      totalSize += getWorkUnitEstSize(w);
-    }
-    return totalSize;
-  }
-
   private static double getPreGroupingSizeFactor(State state) {
     return state.getPropAsDouble(WORKUNIT_PRE_GROUPING_SIZE_FACTOR, 
DEFAULT_WORKUNIT_PRE_GROUPING_SIZE_FACTOR);
   }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 4630d6c..31e30d7 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -155,6 +155,17 @@ public abstract class KafkaWorkUnitPacker {
     workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, estSize);
   }
 
+  /**
+   * Calculate estimated size for a topic from all {@link WorkUnit}s belong to 
it.
+   */
+  static double calcTotalEstSizeForTopic(List<WorkUnit> workUnitsForTopic) {
+    double totalSize = 0;
+    for (WorkUnit w : workUnitsForTopic) {
+      totalSize += getWorkUnitEstSize(w);
+    }
+    return totalSize;
+  }
+
   protected static double getWorkUnitEstSize(WorkUnit workUnit) {
     Preconditions.checkArgument(workUnit.contains(ESTIMATED_WORKUNIT_SIZE));
     return workUnit.getPropAsDouble(ESTIMATED_WORKUNIT_SIZE);
@@ -271,7 +282,7 @@ public abstract class KafkaWorkUnitPacker {
   /**
    * Add a list of partitions of the same topic to a {@link WorkUnit}.
    */
-  private static void populateMultiPartitionWorkUnit(List<KafkaPartition> 
partitions, WorkUnit workUnit) {
+  static void populateMultiPartitionWorkUnit(List<KafkaPartition> partitions, 
WorkUnit workUnit) {
     Preconditions.checkArgument(!partitions.isEmpty(), "There should be at 
least one partition");
     GobblinMetrics.addCustomTagToState(workUnit, new Tag<>("kafkaTopic", 
partitions.get(0).getTopicName()));
     for (int i = 0; i < partitions.size(); i++) {
@@ -289,7 +300,6 @@ public abstract class KafkaWorkUnitPacker {
     for (WorkUnit workUnit : multiWorkUnit.getWorkUnits()) {
       partitions.add(KafkaUtils.getPartition(workUnit));
     }
-
     return partitions;
   }
 

Reply via email to