This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bd35490 [GOBBLIN-706] enable dynamic mappers
bd35490 is described below
commit bd35490c1437168c753f684ab2174d9c17ce8260
Author: Zihan Li <[email protected]>
AuthorDate: Tue Mar 26 11:28:15 2019 -0700
[GOBBLIN-706] enable dynamic mappers
Closes #2576 from ZihanLi58/master
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../extractor/extract/kafka/KafkaSource.java | 15 ++++++++++---
.../kafka/workunit/packer/KafkaWorkUnitPacker.java | 26 +++++++++++++---------
3 files changed, 29 insertions(+), 13 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 3bcbb69..8c920a3 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -617,6 +617,7 @@ public class ConfigurationKeys {
/** Specifies a static location in HDFS to upload jars to. Useful for
sharing jars across different Gobblin runs.*/
public static final String MR_JARS_DIR = "mr.jars.dir";
public static final String MR_JOB_MAX_MAPPERS_KEY = "mr.job.max.mappers";
+ public static final String MR_TARGET_MAPPER_SIZE = "mr.target.mapper.size";
public static final String MR_REPORT_METRICS_AS_COUNTERS_KEY =
"mr.report.metrics.as.counters";
public static final boolean DEFAULT_MR_REPORT_METRICS_AS_COUNTERS = false;
public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 3e5dad6..cb0e2b5 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -258,10 +258,19 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
// Create empty WorkUnits for skipped partitions (i.e., partitions that
have previous offsets,
// but aren't processed).
createEmptyWorkUnitsForSkippedPartitions(workUnits,
topicSpecificStateMap, state);
-
- int numOfMultiWorkunits =
+ //determine the number of mappers
+ int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY,
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
- List<WorkUnit> workUnitList = KafkaWorkUnitPacker.getInstance(this,
state).pack(workUnits, numOfMultiWorkunits);
+ KafkaWorkUnitPacker kafkaWorkUnitPacker =
KafkaWorkUnitPacker.getInstance(this, state);
+ int numOfMultiWorkunits = maxMapperNum;
+ if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
+ double totalEstDataSize =
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
+ LOG.info(String.format("The total estimated data size is %.2f",
totalEstDataSize));
+ double targetMapperSize =
state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
+ numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
+ numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
+ }
+ List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits,
numOfMultiWorkunits);
addTopicSpecificPropsToWorkUnits(workUnitList, topicSpecificStateMap);
setLimiterReportKeyListToWorkUnits(workUnitList,
getLimiterExtractorReportKeys());
return workUnitList;
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index fef3219..676387d 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -103,16 +103,6 @@ public abstract class KafkaWorkUnitPacker {
}
};
- protected double setWorkUnitEstSizes(Map<String, List<WorkUnit>>
workUnitsByTopic) {
- double totalEstDataSize = 0;
- for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
- for (WorkUnit workUnit : workUnitsForTopic) {
- setWorkUnitEstSize(workUnit);
- totalEstDataSize += getWorkUnitEstSize(workUnit);
- }
- }
- return totalEstDataSize;
- }
private void setWorkUnitEstSize(WorkUnit workUnit) {
workUnit.setProp(ESTIMATED_WORKUNIT_SIZE,
this.sizeEstimator.calcEstimatedSize(workUnit));
@@ -364,6 +354,22 @@ public abstract class KafkaWorkUnitPacker {
}
/**
+ * Calculate the total size of the workUnits and set the estimated size for
each workUnit
+ * @param workUnitsByTopic
+ * @return the total size of the input workUnits
+ */
+ public double setWorkUnitEstSizes(Map<String, List<WorkUnit>>
workUnitsByTopic) {
+ double totalEstDataSize = 0;
+ for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
+ for (WorkUnit workUnit : workUnitsForTopic) {
+ setWorkUnitEstSize(workUnit);
+ totalEstDataSize += getWorkUnitEstSize(workUnit);
+ }
+ }
+ return totalEstDataSize;
+ }
+
+ /**
* Group {@link WorkUnit}s into {@link MultiWorkUnit}s. Each input {@link
WorkUnit} corresponds to
* a (topic, partition).
*/