This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e30c6c3b Make KafkaTopicGroupingWorkUnitPacker pack with desired num 
of container (#3814)
9e30c6c3b is described below

commit 9e30c6c3bca857942fba5d10345b1be14adc1942
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Wed Nov 8 11:05:04 2023 -0800

    Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container 
(#3814)
    
    * Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container
    
    * update comment
---
 .../gobblin/source/workunit/MultiWorkUnit.java     | 11 +++
 .../packer/KafkaTopicGroupingWorkUnitPacker.java   | 46 +++++++++++++
 .../KafkaTopicGroupingWorkUnitPackerTest.java      | 80 ++++++++++++++++++++--
 3 files changed, 133 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
index 1392fa97f..9d530f811 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
@@ -160,4 +160,15 @@ public class MultiWorkUnit extends WorkUnit {
   public static MultiWorkUnit createEmpty() {
     return new MultiWorkUnit();
   }
+
+  /**
+   * Create a new {@link MultiWorkUnit} instance based on provided collection 
of {@link WorkUnit}s.
+   *
+   * @return a the {@link MultiWorkUnit} instance with the provided collection 
of {@link WorkUnit}s.
+   */
+  public static MultiWorkUnit createMultiWorkUnit(Collection<WorkUnit> 
workUnits) {
+    MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+    multiWorkUnit.addWorkUnits(workUnits);
+    return multiWorkUnit;
+  }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 1bdc2d8bb..9c22d47cc 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -20,9 +20,11 @@ package 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import java.util.PriorityQueue;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.hadoop.fs.Path;
 
@@ -172,6 +174,9 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
    * - For each topic pack the workunits into a set of topic specific buckets 
by filling the fullest bucket that can hold
    *   the workunit without exceeding the container capacity.
    * - The topic specific multi-workunits are squeezed and returned as a 
workunit.
+   *
+   * @param numContainers desired number of containers, which will be the size 
of return value List<WorkUnit>. The actual
+   *                      num can be smaller or bigger depends on container 
capacity and total workUnit/partition number
    */
   @Override
   public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int 
numContainers) {
@@ -230,6 +235,11 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
       }
     }
 
+    // If size of mwuGroups is smaller than numContainers, try to further 
split the multi WU to respect the container number requirement
+    if(mwuGroups.size() < numContainers) {
+      mwuGroups = splitMultiWorkUnits(mwuGroups, numContainers);
+    }
+
     List<WorkUnit> squeezedGroups = squeezeMultiWorkUnits(mwuGroups);
     log.debug("Squeezed work unit groups: " + squeezedGroups);
     return squeezedGroups;
@@ -383,4 +393,40 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
         throw new RuntimeException("Unsupported computation strategy: " + 
strategy.name());
     }
   }
+
+  /**
+   * A method that split a list of {@link MultiWorkUnit} to the size of 
desiredWUSize if possible. The approach is to try
+   * to evenly split the {@link WorkUnit} within MWU into two, and always try 
to split MWU with more partitions first.
+   * Stop when each {@link MultiWorkUnit} only contains single {@link 
WorkUnit} as further split is no possible.
+   * @param multiWorkUnits the list of {@link MultiWorkUnit} to be split
+   * @param desiredWUSize desired number of {@link MultiWorkUnit}
+   * @return splitted MultiWorkUnit
+   */
+  public static List<MultiWorkUnit> splitMultiWorkUnits(List<MultiWorkUnit> 
multiWorkUnits, int desiredWUSize) {
+    PriorityQueue<MultiWorkUnit> pQueue = new PriorityQueue<>(
+        Comparator.comparing(mwu -> mwu.getWorkUnits().size(), 
Comparator.reverseOrder()));
+    pQueue.addAll(multiWorkUnits);
+
+    while(pQueue.size() < desiredWUSize) {
+      MultiWorkUnit mwu = pQueue.poll();
+      int size = mwu.getWorkUnits().size();
+      // If the size is smaller  than 2, meaning each mwu only contains a 
single wu and can't be further split.
+      // Add back the polled element and stop the loop.
+      if(size < 2) {
+        pQueue.add(mwu);
+        break;
+      }
+      // Split the mwu into 2 with evenly distributed wu
+      
pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(0, 
size/2)));
+      
pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(size/2, 
size)));
+    }
+
+    log.info("Min size of the container is set to {}, successfully split the 
multi workunit to {}", desiredWUSize, pQueue.size());
+
+    // If size is the same, meaning no split can be done. Return the original 
list to avoid construct a new list
+    if(multiWorkUnits.size() == pQueue.size()) {
+      return multiWorkUnits;
+    }
+    return new ArrayList<>(pQueue);
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
index b44df38ba..d45e0bf1c 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +29,7 @@ import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
 import org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource;
 import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -51,7 +53,7 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
   }
 
   /**
-   * Check that capacity is honored.
+   * Check that capacity is honored. Set numContainers to 0 so the workUnit 
list size is determined only by the capacity
    */
   @Test
   public void testSingleTopic() {
@@ -64,7 +66,7 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
         .newArrayList(getWorkUnitWithTopicPartition("topic1", 1), 
getWorkUnitWithTopicPartition("topic1", 2),
             getWorkUnitWithTopicPartition("topic1", 3)));
 
-    List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, 
state, Optional.absent()).pack(workUnitsByTopic, 10);
+    List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, 
state, Optional.absent()).pack(workUnitsByTopic, 0);
     Assert.assertEquals(workUnits.size(), 2);
     Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), 
"topic1");
     
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
 0)), 1);
@@ -91,7 +93,7 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
         .newArrayList(getWorkUnitWithTopicPartition("topic2", 1), 
getWorkUnitWithTopicPartition("topic2", 2),
             getWorkUnitWithTopicPartition("topic2", 3)));
 
-    List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, 
state, Optional.absent()).pack(workUnitsByTopic, 10);
+    List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, 
state, Optional.absent()).pack(workUnitsByTopic, 0);
     Assert.assertEquals(workUnits.size(), 4);
 
     Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), 
"topic1");
@@ -113,8 +115,49 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
     
Assert.assertEquals(workUnits.get(3).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
 2, 0.001);
   }
 
+  @Test
+  public void testMultiTopicWithNumContainers() {
+    KafkaSource source = new UniversalKafkaSource();
+    SourceState state = new SourceState(new State(props));
+    state.setProp("gobblin.kafka.streaming.enableIndexing", true);
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, 
Files.createTempDir().getAbsolutePath());
+
+    Map<String, List<WorkUnit>> workUnitsByTopic = ImmutableMap.of(
+        "topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1", 
1),
+            getWorkUnitWithTopicPartition("topic1", 2)),
+        "topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2", 
1),
+            getWorkUnitWithTopicPartition("topic2", 2),
+            getWorkUnitWithTopicPartition("topic2", 3),
+            getWorkUnitWithTopicPartition("topic2", 4)));
+    KafkaTopicGroupingWorkUnitPacker packer = new 
KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent());
+    List<WorkUnit> workUnits = packer.pack(workUnitsByTopic, 5);
+    Assert.assertEquals(workUnits.size(), 5);
+
+    int partitionCount = 0;
+    for(WorkUnit workUnit : workUnits) {
+      partitionCount += KafkaUtils.getPartitions(workUnit).size();
+    }
+    Assert.assertEquals(partitionCount, 6);
+
+    workUnitsByTopic = ImmutableMap.of(
+        "topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1", 
1),
+            getWorkUnitWithTopicPartition("topic1", 2)),
+        "topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2", 
1),
+            getWorkUnitWithTopicPartition("topic2", 2),
+            getWorkUnitWithTopicPartition("topic2", 3),
+            getWorkUnitWithTopicPartition("topic2", 4)));
+    workUnits = packer.pack(workUnitsByTopic, 7);
+    // Total WU size wouldn't be more than 6
+    Assert.assertEquals(workUnits.size(), 6);
+    partitionCount = 0;
+    for(WorkUnit workUnit : workUnits) {
+      partitionCount += KafkaUtils.getPartitions(workUnit).size();
+    }
+    Assert.assertEquals(partitionCount, 6);
+  }
 
-  public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) {
+
+    public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) 
{
     WorkUnit workUnit = new WorkUnit(new 
Extract(Extract.TableType.APPEND_ONLY, "kafka", topic));
     workUnit.setProp(KafkaSource.TOPIC_NAME, topic);
     workUnit.setProp(KafkaSource.PARTITION_ID, Integer.toString(partition));
@@ -159,4 +202,33 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
     capacity = 
KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, 
strategy);
     Assert.assertEquals(capacity, 1.35, delta);
   }
+
+  @Test
+  public void testSplitMultiWorkUnits() {
+    // Create a list of 2 MWU, each contains 3 WU within
+    List<MultiWorkUnit> multiWorkUnitList = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
+      for (int j = 0; j < 3; j++) {
+        multiWorkUnit.addWorkUnit(WorkUnit.createEmpty());
+
+      }
+      multiWorkUnitList.add(multiWorkUnit);
+    }
+
+    // minWUSize is smaller than MWU size, so the result should remain the 
size of list of MWU
+    List<MultiWorkUnit> mwuList = 
KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 1);
+    Assert.assertEquals(mwuList.size(), 2);
+
+    mwuList = 
KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 3);
+    Assert.assertEquals(mwuList.size(), 3);
+
+    mwuList = 
KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 6);
+    Assert.assertEquals(mwuList.size(), 6);
+
+    // minWUSize is bigger than number combining of all WU, so the result will 
be the sum of all WU
+    mwuList = 
KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 7);
+    Assert.assertEquals(mwuList.size(), 6);
+  }
+
 }
\ No newline at end of file

Reply via email to