This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 68e3a3b [GOBBLIN-879] Refactor bin-packer for better code reuse
68e3a3b is described below
commit 68e3a3bca3017326d8521e80cef5ea420a3625f2
Author: autumnust <[email protected]>
AuthorDate: Sun Sep 15 17:01:12 2019 -0700
[GOBBLIN-879] Refactor bin-packer for better code reuse
Closes #2732 from autumnust/refactorForPacker
---
.../kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java | 8 --------
.../extract/kafka/workunit/packer/KafkaWorkUnitPacker.java | 14 ++++++++++++--
2 files changed, 12 insertions(+), 10 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index f85fd97..85ba232 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -87,14 +87,6 @@ public class KafkaBiLevelWorkUnitPacker extends
KafkaWorkUnitPacker {
return worstFitDecreasingBinPacking(groups, numContainers);
}
- private static double calcTotalEstSizeForTopic(List<WorkUnit>
workUnitsForTopic) {
- double totalSize = 0;
- for (WorkUnit w : workUnitsForTopic) {
- totalSize += getWorkUnitEstSize(w);
- }
- return totalSize;
- }
-
private static double getPreGroupingSizeFactor(State state) {
return state.getPropAsDouble(WORKUNIT_PRE_GROUPING_SIZE_FACTOR,
DEFAULT_WORKUNIT_PRE_GROUPING_SIZE_FACTOR);
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 4630d6c..31e30d7 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -155,6 +155,17 @@ public abstract class KafkaWorkUnitPacker {
workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, estSize);
}
+ /**
+ * Calculate estimated size for a topic from all {@link WorkUnit}s belong to
it.
+ */
+ static double calcTotalEstSizeForTopic(List<WorkUnit> workUnitsForTopic) {
+ double totalSize = 0;
+ for (WorkUnit w : workUnitsForTopic) {
+ totalSize += getWorkUnitEstSize(w);
+ }
+ return totalSize;
+ }
+
protected static double getWorkUnitEstSize(WorkUnit workUnit) {
Preconditions.checkArgument(workUnit.contains(ESTIMATED_WORKUNIT_SIZE));
return workUnit.getPropAsDouble(ESTIMATED_WORKUNIT_SIZE);
@@ -271,7 +282,7 @@ public abstract class KafkaWorkUnitPacker {
/**
* Add a list of partitions of the same topic to a {@link WorkUnit}.
*/
- private static void populateMultiPartitionWorkUnit(List<KafkaPartition>
partitions, WorkUnit workUnit) {
+ static void populateMultiPartitionWorkUnit(List<KafkaPartition> partitions,
WorkUnit workUnit) {
Preconditions.checkArgument(!partitions.isEmpty(), "There should be at
least one partition");
GobblinMetrics.addCustomTagToState(workUnit, new Tag<>("kafkaTopic",
partitions.get(0).getTopicName()));
for (int i = 0; i < partitions.size(); i++) {
@@ -289,7 +300,6 @@ public abstract class KafkaWorkUnitPacker {
for (WorkUnit workUnit : multiWorkUnit.getWorkUnits()) {
partitions.add(KafkaUtils.getPartition(workUnit));
}
-
return partitions;
}