This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ef038f667 [flink] Assign splits evenly in 
StaticFileStoreSplitEnumerator (#856)
ef038f667 is described below

commit ef038f667df505590e4ad9113b0a15e2756d0f71
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 11 10:41:08 2023 +0800

    [flink] Assign splits evenly in StaticFileStoreSplitEnumerator (#856)
---
 .../org/apache/paimon/utils/FixBinPacking.java     | 68 ++++++++++++++++++++++
 .../org/apache/paimon/utils/FixBinPackingTest.java | 39 +++++++++++++
 .../source/StaticFileStoreSplitEnumerator.java     | 10 ++--
 .../source/StaticFileStoreSplitEnumeratorTest.java |  4 +-
 4 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FixBinPacking.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/FixBinPacking.java
new file mode 100644
index 000000000..c3f042b2e
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/FixBinPacking.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.Function;
+
+import static java.util.Comparator.comparingLong;
+
+/** A bin packing implementation for fixed bin number. */
+public class FixBinPacking {
+    private FixBinPacking() {}
+
+    public static <T> List<List<T>> pack(
+            Iterable<T> items, Function<T, Long> weightFunc, int binNumber) {
+        // 1. sort items first
+        List<T> sorted = new ArrayList<>();
+        items.forEach(sorted::add);
+        sorted.sort(comparingLong(weightFunc::apply));
+
+        // 2. packing
+        PriorityQueue<Bin<T>> bins = new PriorityQueue<>();
+        for (T item : sorted) {
+            long weight = weightFunc.apply(item);
+            Bin<T> bin = bins.size() < binNumber ? new Bin<>() : bins.poll();
+            bin.add(item, weight);
+            bins.add(bin);
+        }
+
+        // 3. output
+        List<List<T>> packed = new ArrayList<>();
+        bins.forEach(bin -> packed.add(bin.items));
+        return packed;
+    }
+
+    private static class Bin<T> implements Comparable<Bin<T>> {
+        private final List<T> items = new ArrayList<>();
+        private long binWeight = 0L;
+
+        void add(T item, long weight) {
+            this.binWeight += weight;
+            items.add(item);
+        }
+
+        @Override
+        public int compareTo(Bin<T> other) {
+            return Long.compare(binWeight, other.binWeight);
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/FixBinPackingTest.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/FixBinPackingTest.java
new file mode 100644
index 000000000..4a0736315
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/FixBinPackingTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FixBinPacking}. */
+public class FixBinPackingTest {
+
+    @Test
+    public void test() {
+        List<List<Integer>> pack =
+                FixBinPacking.pack(Arrays.asList(1, 5, 1, 2, 3, 6, 2), 
Integer::longValue, 3);
+        assertThat(pack)
+                .containsExactlyInAnyOrder(
+                        Arrays.asList(1, 3), Arrays.asList(2, 5), 
Arrays.asList(1, 2, 6));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
index 54361a622..48e2ad00e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.FixBinPacking;
 
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -61,12 +62,11 @@ public class StaticFileStoreSplitEnumerator
 
     private static Map<Integer, Queue<FileStoreSourceSplit>> 
createSplitAssignment(
             Collection<FileStoreSourceSplit> splits, int numReaders) {
+        List<List<FileStoreSourceSplit>> assignmentList =
+                FixBinPacking.pack(splits, split -> split.split().rowCount(), 
numReaders);
         Map<Integer, Queue<FileStoreSourceSplit>> assignment = new HashMap<>();
-        int i = 0;
-        for (FileStoreSourceSplit split : splits) {
-            int task = i % numReaders;
-            assignment.computeIfAbsent(task, k -> new 
LinkedList<>()).add(split);
-            i++;
+        for (int i = 0; i < assignmentList.size(); i++) {
+            assignment.put(i, new LinkedList<>(assignmentList.get(i)));
         }
         return assignment;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
index a0cca4042..fa3bea8cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorTest.java
@@ -127,9 +127,9 @@ public class StaticFileStoreSplitEnumeratorTest {
         Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
                 context.getSplitAssignments();
         assertThat(assignments).containsOnlyKeys(0, 1);
-        assertThat(assignments.get(0).getAssignedSplits())
+        assertThat(assignments.get(1).getAssignedSplits())
                 .containsExactly(splits.get(0), splits.get(2));
-        
assertThat(assignments.get(1).getAssignedSplits()).containsExactly(splits.get(1));
+        
assertThat(assignments.get(0).getAssignedSplits()).containsExactly(splits.get(1));
     }
 
     @Test

Reply via email to