This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bd35490  [GOBBLIN-706] enable dynamic mappers
bd35490 is described below

commit bd35490c1437168c753f684ab2174d9c17ce8260
Author: Zihan Li <[email protected]>
AuthorDate: Tue Mar 26 11:28:15 2019 -0700

    [GOBBLIN-706] enable dynamic mappers
    
    Closes #2576 from ZihanLi58/master
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../extractor/extract/kafka/KafkaSource.java       | 15 ++++++++++---
 .../kafka/workunit/packer/KafkaWorkUnitPacker.java | 26 +++++++++++++---------
 3 files changed, 29 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3bcbb69..8c920a3 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -617,6 +617,7 @@ public class ConfigurationKeys {
   /** Specifies a static location in HDFS to upload jars to. Useful for 
sharing jars across different Gobblin runs.*/
   public static final String MR_JARS_DIR = "mr.jars.dir";
   public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
+  public static final String MR_TARGET_MAPPER_SIZE = "mr.target.mapper.size";
   public static final String MR_REPORT_METRICS_AS_COUNTERS_KEY = 
"mr.report.metrics.as.counters";
   public static final boolean DEFAULT_MR_REPORT_METRICS_AS_COUNTERS = false;
   public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 3e5dad6..cb0e2b5 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -258,10 +258,19 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
       // Create empty WorkUnits for skipped partitions (i.e., partitions that 
have previous offsets,
       // but aren't processed).
       createEmptyWorkUnitsForSkippedPartitions(workUnits, 
topicSpecificStateMap, state);
-
-      int numOfMultiWorkunits =
+      //determine the number of mappers
+      int maxMapperNum =
           state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, 
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
-      List<WorkUnit> workUnitList = KafkaWorkUnitPacker.getInstance(this, 
state).pack(workUnits, numOfMultiWorkunits);
+      KafkaWorkUnitPacker kafkaWorkUnitPacker = 
KafkaWorkUnitPacker.getInstance(this, state);
+      int numOfMultiWorkunits = maxMapperNum;
+      if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
+        double totalEstDataSize = 
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
+        LOG.info(String.format("The total estimated data size is %.2f", 
totalEstDataSize));
+        double targetMapperSize = 
state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
+        numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
+        numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
+      }
+      List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, 
numOfMultiWorkunits);
       addTopicSpecificPropsToWorkUnits(workUnitList, topicSpecificStateMap);
       setLimiterReportKeyListToWorkUnits(workUnitList, 
getLimiterExtractorReportKeys());
       return workUnitList;
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index fef3219..676387d 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -103,16 +103,6 @@ public abstract class KafkaWorkUnitPacker {
     }
   };
 
-  protected double setWorkUnitEstSizes(Map<String, List<WorkUnit>> 
workUnitsByTopic) {
-    double totalEstDataSize = 0;
-    for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
-      for (WorkUnit workUnit : workUnitsForTopic) {
-        setWorkUnitEstSize(workUnit);
-        totalEstDataSize += getWorkUnitEstSize(workUnit);
-      }
-    }
-    return totalEstDataSize;
-  }
 
   private void setWorkUnitEstSize(WorkUnit workUnit) {
     workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, 
this.sizeEstimator.calcEstimatedSize(workUnit));
@@ -364,6 +354,22 @@ public abstract class KafkaWorkUnitPacker {
   }
 
   /**
+   * Calculate the total size of the workUnits and set the estimated size for 
each workUnit
+   * @param workUnitsByTopic
+   * @return the total size of the input workUnits
+   */
+  public double setWorkUnitEstSizes(Map<String, List<WorkUnit>> 
workUnitsByTopic) {
+    double totalEstDataSize = 0;
+    for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
+      for (WorkUnit workUnit : workUnitsForTopic) {
+        setWorkUnitEstSize(workUnit);
+        totalEstDataSize += getWorkUnitEstSize(workUnit);
+      }
+    }
+    return totalEstDataSize;
+  }
+
+  /**
    * Group {@link WorkUnit}s into {@link MultiWorkUnit}s. Each input {@link 
WorkUnit} corresponds to
    * a (topic, partition).
    */

Reply via email to