This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 025655f [GOBBLIN-1229] Make topic specific state available to Kafka
workunit packer[]
025655f is described below
commit 025655f7b52a87126640c6f14c39e3f501310ac4
Author: sv2000 <[email protected]>
AuthorDate: Wed Aug 12 15:58:21 2020 -0700
[GOBBLIN-1229] Make topic specific state available to Kafka workunit
packer[]
Closes #3075 from sv2000/datasetAwareBinPacking
---
.../source/extractor/extract/kafka/KafkaSource.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 6a30756..6e5927b 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -102,6 +102,9 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
public static final Extract.TableType DEFAULT_TABLE_TYPE =
Extract.TableType.APPEND_ONLY;
public static final String DEFAULT_NAMESPACE_NAME = "KAFKA";
public static final String ALL_TOPICS = "all";
+ //A workunit property that contains the number of topic partitions for a
given topic. Useful for
+ //workunit size estimation to assign weights to a given topic partition.
+ public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
public static final String AVG_RECORD_SIZE = "avg.record.size";
public static final String AVG_RECORD_MILLIS = "avg.record.millis";
public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
@@ -277,8 +280,8 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
}
+ addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits,
numOfMultiWorkunits);
- addTopicSpecificPropsToWorkUnits(workUnitList, topicSpecificStateMap);
setLimiterReportKeyListToWorkUnits(workUnitList,
getLimiterExtractorReportKeys());
return workUnitList;
} catch (InstantiationException | IllegalAccessException |
ClassNotFoundException e) {
@@ -311,9 +314,11 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
}
}
- private void addTopicSpecificPropsToWorkUnits(List<WorkUnit> workUnits,
Map<String, State> topicSpecificStateMap) {
- for (WorkUnit workUnit : workUnits) {
- addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);
+ private void addTopicSpecificPropsToWorkUnits(Map<String, List<WorkUnit>>
workUnits, Map<String, State> topicSpecificStateMap) {
+ for (List<WorkUnit> workUnitList : workUnits.values()) {
+ for (WorkUnit workUnit : workUnitList) {
+ addTopicSpecificPropsToWorkUnit(workUnit, topicSpecificStateMap);
+ }
}
}
@@ -380,7 +385,8 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
context.close();
List<WorkUnit> workUnits = Lists.newArrayList();
- for (KafkaPartition partition : topic.getPartitions()) {
+ List<KafkaPartition> topicPartitions = topic.getPartitions();
+ for (KafkaPartition partition : topicPartitions) {
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state,
topicSpecificState);
if (workUnit != null) {
// For disqualified topics, for each of its workunits set the high
watermark to be the same
@@ -388,6 +394,7 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
if (!topicQualified) {
skipWorkUnit(workUnit);
}
+ workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
workUnits.add(workUnit);
}
}