This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9e30c6c3b Make KafkaTopicGroupingWorkUnitPacker pack with desired num
of container (#3814)
9e30c6c3b is described below
commit 9e30c6c3bca857942fba5d10345b1be14adc1942
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Wed Nov 8 11:05:04 2023 -0800
Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container
(#3814)
* Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container
* update comment
---
.../gobblin/source/workunit/MultiWorkUnit.java | 11 +++
.../packer/KafkaTopicGroupingWorkUnitPacker.java | 46 +++++++++++++
.../KafkaTopicGroupingWorkUnitPackerTest.java | 80 ++++++++++++++++++++--
3 files changed, 133 insertions(+), 4 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
index 1392fa97f..9d530f811 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
@@ -160,4 +160,15 @@ public class MultiWorkUnit extends WorkUnit {
public static MultiWorkUnit createEmpty() {
return new MultiWorkUnit();
}
+
+ /**
+ * Create a new {@link MultiWorkUnit} instance based on provided collection
of {@link WorkUnit}s.
+ *
+ * @return a the {@link MultiWorkUnit} instance with the provided collection
of {@link WorkUnit}s.
+ */
+ public static MultiWorkUnit createMultiWorkUnit(Collection<WorkUnit>
workUnits) {
+ MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+ multiWorkUnit.addWorkUnits(workUnits);
+ return multiWorkUnit;
+ }
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 1bdc2d8bb..9c22d47cc 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -20,9 +20,11 @@ package
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.hadoop.fs.Path;
@@ -172,6 +174,9 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
* - For each topic pack the workunits into a set of topic specific buckets
by filling the fullest bucket that can hold
* the workunit without exceeding the container capacity.
* - The topic specific multi-workunits are squeezed and returned as a
workunit.
+ *
+ * @param numContainers desired number of containers, which will be the size
of return value List<WorkUnit>. The actual
+ * num can be smaller or bigger depends on container
capacity and total workUnit/partition number
*/
@Override
public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int
numContainers) {
@@ -230,6 +235,11 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
}
}
+ // If size of mwuGroups is smaller than numContainers, try to further
split the multi WU to respect the container number requirement
+ if(mwuGroups.size() < numContainers) {
+ mwuGroups = splitMultiWorkUnits(mwuGroups, numContainers);
+ }
+
List<WorkUnit> squeezedGroups = squeezeMultiWorkUnits(mwuGroups);
log.debug("Squeezed work unit groups: " + squeezedGroups);
return squeezedGroups;
@@ -383,4 +393,40 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
throw new RuntimeException("Unsupported computation strategy: " +
strategy.name());
}
}
+
+ /**
+ * A method that split a list of {@link MultiWorkUnit} to the size of
desiredWUSize if possible. The approach is to try
+ * to evenly split the {@link WorkUnit} within MWU into two, and always try
to split MWU with more partitions first.
+ * Stop when each {@link MultiWorkUnit} only contains single {@link
WorkUnit} as further split is no possible.
+ * @param multiWorkUnits the list of {@link MultiWorkUnit} to be split
+ * @param desiredWUSize desired number of {@link MultiWorkUnit}
+ * @return splitted MultiWorkUnit
+ */
+ public static List<MultiWorkUnit> splitMultiWorkUnits(List<MultiWorkUnit>
multiWorkUnits, int desiredWUSize) {
+ PriorityQueue<MultiWorkUnit> pQueue = new PriorityQueue<>(
+ Comparator.comparing(mwu -> mwu.getWorkUnits().size(),
Comparator.reverseOrder()));
+ pQueue.addAll(multiWorkUnits);
+
+ while(pQueue.size() < desiredWUSize) {
+ MultiWorkUnit mwu = pQueue.poll();
+ int size = mwu.getWorkUnits().size();
+ // If the size is smaller than 2, meaning each mwu only contains a
single wu and can't be further split.
+ // Add back the polled element and stop the loop.
+ if(size < 2) {
+ pQueue.add(mwu);
+ break;
+ }
+ // Split the mwu into 2 with evenly distributed wu
+
pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(0,
size/2)));
+
pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(size/2,
size)));
+ }
+
+ log.info("Min size of the container is set to {}, successfully split the
multi workunit to {}", desiredWUSize, pQueue.size());
+
+ // If size is the same, meaning no split can be done. Return the original
list to avoid construct a new list
+ if(multiWorkUnits.size() == pQueue.size()) {
+ return multiWorkUnits;
+ }
+ return new ArrayList<>(pQueue);
+ }
}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
index b44df38ba..d45e0bf1c 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -28,6 +29,7 @@ import
org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource;
import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@@ -51,7 +53,7 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
}
/**
- * Check that capacity is honored.
+ * Check that capacity is honored. Set numContainers to 0 so the workUnit
list size is determined only by the capacity
*/
@Test
public void testSingleTopic() {
@@ -64,7 +66,7 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
.newArrayList(getWorkUnitWithTopicPartition("topic1", 1),
getWorkUnitWithTopicPartition("topic1", 2),
getWorkUnitWithTopicPartition("topic1", 3)));
- List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source,
state, Optional.absent()).pack(workUnitsByTopic, 10);
+ List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source,
state, Optional.absent()).pack(workUnitsByTopic, 0);
Assert.assertEquals(workUnits.size(), 2);
Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME),
"topic1");
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID,
0)), 1);
@@ -91,7 +93,7 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
.newArrayList(getWorkUnitWithTopicPartition("topic2", 1),
getWorkUnitWithTopicPartition("topic2", 2),
getWorkUnitWithTopicPartition("topic2", 3)));
- List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source,
state, Optional.absent()).pack(workUnitsByTopic, 10);
+ List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source,
state, Optional.absent()).pack(workUnitsByTopic, 0);
Assert.assertEquals(workUnits.size(), 4);
Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME),
"topic1");
@@ -113,8 +115,49 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
Assert.assertEquals(workUnits.get(3).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY),
2, 0.001);
}
+ @Test
+ public void testMultiTopicWithNumContainers() {
+ KafkaSource source = new UniversalKafkaSource();
+ SourceState state = new SourceState(new State(props));
+ state.setProp("gobblin.kafka.streaming.enableIndexing", true);
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR,
Files.createTempDir().getAbsolutePath());
+
+ Map<String, List<WorkUnit>> workUnitsByTopic = ImmutableMap.of(
+ "topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1",
1),
+ getWorkUnitWithTopicPartition("topic1", 2)),
+ "topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2",
1),
+ getWorkUnitWithTopicPartition("topic2", 2),
+ getWorkUnitWithTopicPartition("topic2", 3),
+ getWorkUnitWithTopicPartition("topic2", 4)));
+ KafkaTopicGroupingWorkUnitPacker packer = new
KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent());
+ List<WorkUnit> workUnits = packer.pack(workUnitsByTopic, 5);
+ Assert.assertEquals(workUnits.size(), 5);
+
+ int partitionCount = 0;
+ for(WorkUnit workUnit : workUnits) {
+ partitionCount += KafkaUtils.getPartitions(workUnit).size();
+ }
+ Assert.assertEquals(partitionCount, 6);
+
+ workUnitsByTopic = ImmutableMap.of(
+ "topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1",
1),
+ getWorkUnitWithTopicPartition("topic1", 2)),
+ "topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2",
1),
+ getWorkUnitWithTopicPartition("topic2", 2),
+ getWorkUnitWithTopicPartition("topic2", 3),
+ getWorkUnitWithTopicPartition("topic2", 4)));
+ workUnits = packer.pack(workUnitsByTopic, 7);
+ // Total WU size wouldn't be more than 6
+ Assert.assertEquals(workUnits.size(), 6);
+ partitionCount = 0;
+ for(WorkUnit workUnit : workUnits) {
+ partitionCount += KafkaUtils.getPartitions(workUnit).size();
+ }
+ Assert.assertEquals(partitionCount, 6);
+ }
- public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) {
+
+ public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition)
{
WorkUnit workUnit = new WorkUnit(new
Extract(Extract.TableType.APPEND_ONLY, "kafka", topic));
workUnit.setProp(KafkaSource.TOPIC_NAME, topic);
workUnit.setProp(KafkaSource.PARTITION_ID, Integer.toString(partition));
@@ -159,4 +202,33 @@ public class KafkaTopicGroupingWorkUnitPackerTest {
capacity =
KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities,
strategy);
Assert.assertEquals(capacity, 1.35, delta);
}
+
+ @Test
+ public void testSplitMultiWorkUnits() {
+ // Create a list of 2 MWU, each contains 3 WU within
+ List<MultiWorkUnit> multiWorkUnitList = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
+ for (int j = 0; j < 3; j++) {
+ multiWorkUnit.addWorkUnit(WorkUnit.createEmpty());
+
+ }
+ multiWorkUnitList.add(multiWorkUnit);
+ }
+
+ // minWUSize is smaller than MWU size, so the result should remain the
size of list of MWU
+ List<MultiWorkUnit> mwuList =
KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 1);
+ Assert.assertEquals(mwuList.size(), 2);
+
+ mwuList =
KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 3);
+ Assert.assertEquals(mwuList.size(), 3);
+
+ mwuList =
KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 6);
+ Assert.assertEquals(mwuList.size(), 6);
+
+ // minWUSize is bigger than number combining of all WU, so the result will
be the sum of all WU
+ mwuList =
KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 7);
+ Assert.assertEquals(mwuList.size(), 6);
+ }
+
}
\ No newline at end of file