[
https://issues.apache.org/jira/browse/GOBBLIN-1922?focusedWorklogId=887297&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887297
]
ASF GitHub Bot logged work on GOBBLIN-1922:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Oct/23 21:33
Start Date: 25/Oct/23 21:33
Worklog Time Spent: 10m
Work Description: hanghangliu commented on code in PR #3798:
URL: https://github.com/apache/gobblin/pull/3798#discussion_r1372346916
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -257,32 +277,42 @@ public String apply(KafkaTopic topic) {
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
for (KafkaTopic topic : topics) {
+ LOG.info("Discovered topic " + topic);
+ if (topic.getTopicSpecificState().isPresent()) {
+ topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new
State())
+ .addAllIfNotExist(topic.getTopicSpecificState().get());
+ }
+ Optional<Set<Integer>> partitionIDSet = Optional.absent();
+ if(filteredTopicPartition.isPresent()) {
+ List<Integer> list =
java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
+ .orElse(new ArrayList<>());
+ partitionIDSet = Optional.of(new HashSet<>(list));
+ LOG.info("Compute the workunit for topic {} with num of filtered
partitions: {}",
+ topic.getName(), list.size());
+ }
+
threadPool.submit(
new WorkUnitCreator(topic, state,
Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
- workUnits));
+ kafkaTopicWorkunitMap, partitionIDSet));
}
ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L,
TimeUnit.HOURS);
- LOG.info(String.format("Created workunits for %d topics in %d seconds",
workUnits.size(),
+ LOG.info(String.format("Created workunits for %d topics in %d seconds",
kafkaTopicWorkunitMap.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));
// Create empty WorkUnits for skipped partitions (i.e., partitions that
have previous offsets,
// but aren't processed).
- createEmptyWorkUnitsForSkippedPartitions(workUnits,
topicSpecificStateMap, state);
- //determine the number of mappers
- int maxMapperNum =
- state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY,
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
+ createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap,
topicSpecificStateMap, state);
+
KafkaWorkUnitPacker kafkaWorkUnitPacker =
KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
- int numOfMultiWorkunits = maxMapperNum;
- if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
- double totalEstDataSize =
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
- LOG.info(String.format("The total estimated data size is %.2f",
totalEstDataSize));
- double targetMapperSize =
state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
- numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
- numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
+ int numOfMultiWorkunits = minContainer.or(0);
+ if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
+ numOfMultiWorkunits = Math.max(numOfMultiWorkunits,
+ calculateNumMappersForPacker(state, kafkaWorkUnitPacker,
kafkaTopicWorkunitMap));
}
- addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
- List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits,
numOfMultiWorkunits);
+
+ addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap,
topicSpecificStateMap);
+ List<WorkUnit> workUnitList =
kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits);
Review Comment:
take this example of before calling
[squeezeMultiWorkUnits](https://github.com/apache/gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java#L231)
for mwuGroups, it contains multiWU as:
topicA: partition 0-7
topicA: partition 8-15
topicB: partition 0-7
Say we set numContainers as 5, so we can further split the multiWU to:
topicA: partition 0-3
topicA: partition 4-7
topicA: partition 8-15
topicB: partition 0-3
topicB: partition 4-7
Then we squeeze them to 5 WU, will result in 5 containers. Do you think this
process make sense?
Issue Time Tracking
-------------------
Worklog Id: (was: 887297)
Time Spent: 2h 40m (was: 2.5h)
> Create work units for selected topics partitions in Kafka source
> ----------------------------------------------------------------
>
> Key: GOBBLIN-1922
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1922
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-kafka
> Reporter: Hanghang Liu
> Assignee: Shirshanka Das
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> Add a new feature in Kafka source to create workUnit for selected topics
> partitions. This feature can provide functionality for:
> # Recompute and split tasks(workUnit) during runtime for selected topics
> partitiions, instead of recompute for all Kafka topics
> # Make topic level replan feasible
> # One step closer to dynamic work unit allocation
> Need to have a followup to make work unit packer able to pack the recomputed
> workunits into a desired number of containers
--
This message was sent by Atlassian Jira
(v8.20.10#820010)