[ 
https://issues.apache.org/jira/browse/GOBBLIN-1922?focusedWorklogId=887296&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887296
 ]

ASF GitHub Bot logged work on GOBBLIN-1922:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Oct/23 21:32
            Start Date: 25/Oct/23 21:32
    Worklog Time Spent: 10m 
      Work Description: hanghangliu commented on code in PR #3798:
URL: https://github.com/apache/gobblin/pull/3798#discussion_r1372346916


##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -257,32 +277,42 @@ public String apply(KafkaTopic topic) {
       Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
 
       for (KafkaTopic topic : topics) {
+        LOG.info("Discovered topic " + topic);
+        if (topic.getTopicSpecificState().isPresent()) {
+          topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new 
State())
+              .addAllIfNotExist(topic.getTopicSpecificState().get());
+        }
+        Optional<Set<Integer>> partitionIDSet = Optional.absent();
+        if(filteredTopicPartition.isPresent()) {
+          List<Integer> list = 
java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
+              .orElse(new ArrayList<>());
+          partitionIDSet = Optional.of(new HashSet<>(list));
+          LOG.info("Compute the workunit for topic {} with num of filtered 
partitions: {}",
+              topic.getName(), list.size());
+        }
+
         threadPool.submit(
             new WorkUnitCreator(topic, state, 
Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
-                workUnits));
+                kafkaTopicWorkunitMap, partitionIDSet));
       }
 
       ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, 
TimeUnit.HOURS);
-      LOG.info(String.format("Created workunits for %d topics in %d seconds", 
workUnits.size(),
+      LOG.info(String.format("Created workunits for %d topics in %d seconds", 
kafkaTopicWorkunitMap.size(),
           createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));
 
       // Create empty WorkUnits for skipped partitions (i.e., partitions that 
have previous offsets,
       // but aren't processed).
-      createEmptyWorkUnitsForSkippedPartitions(workUnits, 
topicSpecificStateMap, state);
-      //determine the number of mappers
-      int maxMapperNum =
-          state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, 
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
+      createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, 
topicSpecificStateMap, state);
+
       KafkaWorkUnitPacker kafkaWorkUnitPacker = 
KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
-      int numOfMultiWorkunits = maxMapperNum;
-      if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
-        double totalEstDataSize = 
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
-        LOG.info(String.format("The total estimated data size is %.2f", 
totalEstDataSize));
-        double targetMapperSize = 
state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
-        numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
-        numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
+      int numOfMultiWorkunits = minContainer.or(0);
+      if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
+        numOfMultiWorkunits = Math.max(numOfMultiWorkunits,
+            calculateNumMappersForPacker(state, kafkaWorkUnitPacker, 
kafkaTopicWorkunitMap));
       }
-      addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
-      List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, 
numOfMultiWorkunits);
+
+      addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, 
topicSpecificStateMap);
+      List<WorkUnit> workUnitList = 
kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits);

Review Comment:
   take this example of before calling 
[squeezeMultiWorkUnits](https://github.com/apache/gobblin/blob/master/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java#L231)
 for mwuGroups, it contains multiWU like:
   topicA: partition 0-7 
   topicA: partition 8-15 
   topicB: partition 0-7 
   
   Say we set numContainers as 5, so we can further split the multiWU to:
   topicA: partition 0-3 
   topicA: partition 4-7 
   topicA: partition 8-15 
   topicB: partition 0-3
   topicB: partition 4-7
   Then we squeeze them to 5 WU, will result in 5 containers. Do you think this 
process make sense?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 887296)
    Time Spent: 2.5h  (was: 2h 20m)

> Create work units for selected topics partitions in Kafka source
> ----------------------------------------------------------------
>
>                 Key: GOBBLIN-1922
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1922
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-kafka
>            Reporter: Hanghang Liu
>            Assignee: Shirshanka Das
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Add a new feature in Kafka source to create workUnit for selected topics 
> partitions. This feature can provide functionality for:
>  # Recompute and split tasks(workUnit) during runtime for selected topics 
> partitiions, instead of recompute for all Kafka topics
>  # Make topic level replan feasible
>  # One step closer to dynamic work unit allocation 
> Need to have a followup to make work unit packer able to pack the recomputed 
> workunits into a desired number of containers 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to