[
https://issues.apache.org/jira/browse/GOBBLIN-1922?focusedWorklogId=884922&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-884922
]
ASF GitHub Bot logged work on GOBBLIN-1922:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Oct/23 03:10
Start Date: 13/Oct/23 03:10
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3798:
URL: https://github.com/apache/gobblin/pull/3798#discussion_r1357701681
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -268,17 +271,11 @@ public String apply(KafkaTopic topic) {
// Create empty WorkUnits for skipped partitions (i.e., partitions that
have previous offsets,
// but aren't processed).
createEmptyWorkUnitsForSkippedPartitions(workUnits,
topicSpecificStateMap, state);
- //determine the number of mappers
- int maxMapperNum =
- state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY,
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
+
KafkaWorkUnitPacker kafkaWorkUnitPacker =
KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
- int numOfMultiWorkunits = maxMapperNum;
- if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
- double totalEstDataSize =
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
- LOG.info(String.format("The total estimated data size is %.2f",
totalEstDataSize));
- double targetMapperSize =
state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
- numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
- numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
+ int numOfMultiWorkunits= 0;
+ if(!(kafkaWorkUnitPacker instanceof KafkaTopicGroupingWorkUnitPacker)) {
Review Comment:
why do we need this check?
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
}
}
+ public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {
Review Comment:
Add javadoc here?
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java:
##########
@@ -169,6 +170,22 @@ default long committed(KafkaPartition partition) {
public default void assignAndSeek(List<KafkaPartition> topicPartitions,
Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }
+ /**
+ * Get a list of all kafka topics
+ */
+ public default List<KafkaTopic> getTopics() {return
Collections.emptyList();};
Review Comment:
What's the reason for moving this method here?
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
}
}
+ public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {
Review Comment:
why min container is needed here and how will we determine it?
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -886,13 +997,24 @@ private class WorkUnitCreator implements Runnable {
private final SourceState state;
private final Optional<State> topicSpecificState;
private final Map<String, List<WorkUnit>> allTopicWorkUnits;
+ private final Optional<Set<Integer>> filteredPartitionsId;
WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State>
topicSpecificState,
Map<String, List<WorkUnit>> workUnits) {
this.topic = topic;
this.state = state;
this.topicSpecificState = topicSpecificState;
this.allTopicWorkUnits = workUnits;
+ this.filteredPartitionsId = Optional.absent();
+ }
+
+ WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State>
topicSpecificState,
Review Comment:
duplicate code, extract the common part?
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
}
}
+ public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {
+ Map<String, List<WorkUnit>> kafkaTopicWorkunitMap =
Maps.newConcurrentMap();
Review Comment:
I feel most of code here are duplicate with getWorkunits method, can we
extract the common code?
Issue Time Tracking
-------------------
Worklog Id: (was: 884922)
Time Spent: 0.5h (was: 20m)
> Create work units for selected topics partitions in Kafka source
> ----------------------------------------------------------------
>
> Key: GOBBLIN-1922
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1922
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-kafka
> Reporter: Hanghang Liu
> Assignee: Shirshanka Das
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Add a new feature in Kafka source to create workUnit for selected topics
> partitions. This feature can provide functionality for:
> # Recompute and split tasks(workUnit) during runtime for selected topics
> partitiions, instead of recompute for all Kafka topics
> # Make topic level replan feasible
> # One step closer to dynamic work unit allocation
> Need to have a followup to make work unit packer able to pack the recomputed
> workunits into a desired number of containers
--
This message was sent by Atlassian Jira
(v8.20.10#820010)