[ 
https://issues.apache.org/jira/browse/GOBBLIN-1922?focusedWorklogId=884922&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-884922
 ]

ASF GitHub Bot logged work on GOBBLIN-1922:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Oct/23 03:10
            Start Date: 13/Oct/23 03:10
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3798:
URL: https://github.com/apache/gobblin/pull/3798#discussion_r1357701681


##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -268,17 +271,11 @@ public String apply(KafkaTopic topic) {
       // Create empty WorkUnits for skipped partitions (i.e., partitions that 
have previous offsets,
       // but aren't processed).
       createEmptyWorkUnitsForSkippedPartitions(workUnits, 
topicSpecificStateMap, state);
-      //determine the number of mappers
-      int maxMapperNum =
-          state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, 
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
+
       KafkaWorkUnitPacker kafkaWorkUnitPacker = 
KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
-      int numOfMultiWorkunits = maxMapperNum;
-      if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
-        double totalEstDataSize = 
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
-        LOG.info(String.format("The total estimated data size is %.2f", 
totalEstDataSize));
-        double targetMapperSize = 
state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
-        numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
-        numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
+      int numOfMultiWorkunits= 0;
+      if(!(kafkaWorkUnitPacker instanceof KafkaTopicGroupingWorkUnitPacker)) {

Review Comment:
   why do we need this check?



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
     }
   }
 
+  public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state, 
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {

Review Comment:
   Add javadoc here?



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/GobblinKafkaConsumerClient.java:
##########
@@ -169,6 +170,22 @@ default long committed(KafkaPartition partition) {
 
   public default void assignAndSeek(List<KafkaPartition> topicPartitions, 
Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }
 
+  /**
+   * Get a list of all kafka topics
+   */
+  public default List<KafkaTopic> getTopics() {return 
Collections.emptyList();};

Review Comment:
   What's the reason for moving this method here?



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
     }
   }
 
+  public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state, 
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {

Review Comment:
   why min container is needed here and how will we determine it? 



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -886,13 +997,24 @@ private class WorkUnitCreator implements Runnable {
     private final SourceState state;
     private final Optional<State> topicSpecificState;
     private final Map<String, List<WorkUnit>> allTopicWorkUnits;
+    private final Optional<Set<Integer>> filteredPartitionsId;
 
     WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> 
topicSpecificState,
         Map<String, List<WorkUnit>> workUnits) {
       this.topic = topic;
       this.state = state;
       this.topicSpecificState = topicSpecificState;
       this.allTopicWorkUnits = workUnits;
+      this.filteredPartitionsId = Optional.absent();
+    }
+
+    WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> 
topicSpecificState,

Review Comment:
   duplicate code, extract the common part?



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java:
##########
@@ -306,6 +303,96 @@ public String apply(KafkaTopic topic) {
     }
   }
 
+  public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state, 
Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {
+    Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = 
Maps.newConcurrentMap();

Review Comment:
   I feel most of code here are duplicate with getWorkunits method, can we 
extract the common code?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 884922)
    Time Spent: 0.5h  (was: 20m)

> Create work units for selected topics partitions in Kafka source
> ----------------------------------------------------------------
>
>                 Key: GOBBLIN-1922
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1922
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-kafka
>            Reporter: Hanghang Liu
>            Assignee: Shirshanka Das
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Add a new feature in Kafka source to create workUnit for selected topics 
> partitions. This feature can provide functionality for:
>  # Recompute and split tasks(workUnit) during runtime for selected topics 
> partitiions, instead of recompute for all Kafka topics
>  # Make topic level replan feasible
>  # One step closer to dynamic work unit allocation 
> Need to have a followup to make work unit packer able to pack the recomputed 
> workunits into a desired number of containers 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to