[ 
https://issues.apache.org/jira/browse/GOBBLIN-1922?focusedWorklogId=887287&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-887287
 ]

ASF GitHub Bot logged work on GOBBLIN-1922:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Oct/23 20:55
            Start Date: 25/Oct/23 20:55
    Worklog Time Spent: 10m 
      Work Description: hanghangliu commented on code in PR #3798:
URL: https://github.com/apache/gobblin/pull/3798#discussion_r1372316705


##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -257,32 +277,42 @@ public String apply(KafkaTopic topic) {
       Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
 
       for (KafkaTopic topic : topics) {
+        LOG.info("Discovered topic " + topic);
+        if (topic.getTopicSpecificState().isPresent()) {
+          topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new 
State())
+              .addAllIfNotExist(topic.getTopicSpecificState().get());
+        }
+        Optional<Set<Integer>> partitionIDSet = Optional.absent();
+        if(filteredTopicPartition.isPresent()) {
+          List<Integer> list = 
java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
+              .orElse(new ArrayList<>());
+          partitionIDSet = Optional.of(new HashSet<>(list));
+          LOG.info("Compute the workunit for topic {} with num of filtered 
partitions: {}",
+              topic.getName(), list.size());
+        }
+
         threadPool.submit(
             new WorkUnitCreator(topic, state, 
Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
-                workUnits));
+                kafkaTopicWorkunitMap, partitionIDSet));
       }
 
       ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, 
TimeUnit.HOURS);
-      LOG.info(String.format("Created workunits for %d topics in %d seconds", 
workUnits.size(),
+      LOG.info(String.format("Created workunits for %d topics in %d seconds", 
kafkaTopicWorkunitMap.size(),
           createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));
 
       // Create empty WorkUnits for skipped partitions (i.e., partitions that 
have previous offsets,
       // but aren't processed).
-      createEmptyWorkUnitsForSkippedPartitions(workUnits, 
topicSpecificStateMap, state);
-      //determine the number of mappers
-      int maxMapperNum =
-          state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, 
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
+      createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, 
topicSpecificStateMap, state);

Review Comment:
   added a condition to check if filteredTopicPartition present. Only invoke 
the call the it's not present





Issue Time Tracking
-------------------

    Worklog Id:     (was: 887287)
    Time Spent: 1h 40m  (was: 1.5h)

> Create work units for selected topics partitions in Kafka source
> ----------------------------------------------------------------
>
>                 Key: GOBBLIN-1922
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1922
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-kafka
>            Reporter: Hanghang Liu
>            Assignee: Shirshanka Das
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Add a new feature in Kafka source to create workUnit for selected topics 
> partitions. This feature can provide functionality for:
>  # Recompute and split tasks(workUnit) during runtime for selected topics 
> partitiions, instead of recompute for all Kafka topics
>  # Make topic level replan feasible
>  # One step closer to dynamic work unit allocation 
> Need to have a followup to make work unit packer able to pack the recomputed 
> workunits into a desired number of containers 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to