This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 4296d7c1 [FLINK-31252] Improve StaticFileStoreSplitEnumerator to
assign batch splits
4296d7c1 is described below
commit 4296d7c1cca7ff8fb5525401b1ef1659aae5879a
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 3 15:00:41 2023 +0800
[FLINK-31252] Improve StaticFileStoreSplitEnumerator to assign batch splits
This closes #563
---
.../source/StaticFileStoreSplitEnumerator.java | 44 ++++++--
.../source/ContinuousFileSplitEnumeratorTest.java | 2 +-
.../source/StaticFileStoreSplitEnumeratorTest.java | 122 +++++++++++++++++++++
3 files changed, 157 insertions(+), 11 deletions(-)
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index 1712ebde..82dcd20a 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -20,15 +20,17 @@ package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.table.store.file.Snapshot;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import java.util.Queue;
+import java.util.Map;
/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource}
input. */
public class StaticFileStoreSplitEnumerator
@@ -38,7 +40,7 @@ public class StaticFileStoreSplitEnumerator
@Nullable private final Snapshot snapshot;
- private final Queue<FileStoreSourceSplit> splits;
+ private final Map<Integer, List<FileStoreSourceSplit>>
pendingSplitAssignment;
public StaticFileStoreSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
@@ -46,7 +48,19 @@ public class StaticFileStoreSplitEnumerator
Collection<FileStoreSourceSplit> splits) {
this.context = context;
this.snapshot = snapshot;
- this.splits = new LinkedList<>(splits);
+ this.pendingSplitAssignment = createSplitAssignment(splits,
context.currentParallelism());
+ }
+
+ private static Map<Integer, List<FileStoreSourceSplit>>
createSplitAssignment(
+ Collection<FileStoreSourceSplit> splits, int numReaders) {
+ Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
+ int i = 0;
+ for (FileStoreSourceSplit split : splits) {
+ int task = i % numReaders;
+ assignment.computeIfAbsent(task, k -> new
ArrayList<>()).add(split);
+ i++;
+ }
+ return assignment;
}
@Override
@@ -61,9 +75,16 @@ public class StaticFileStoreSplitEnumerator
return;
}
- FileStoreSourceSplit split = splits.poll();
- if (split != null) {
- context.assignSplit(split, subtask);
+ // The following batch assignment operation is for two purposes:
+ // 1. To distribute splits evenly when batch reading to prevent a few
tasks from reading all
+ // the data (for example, the current resource can only schedule part
of the tasks).
+ // 2. Optimize limit reading. In limit reading, the task will
repeatedly create SplitFetcher
+ // to read the data of the limit number for each coming split (the
limit status is in the
+ // SplitFetcher). So if the assigment are divided too small, the task
will cost more time on
+ // creating SplitFetcher and reading data.
+ List<FileStoreSourceSplit> splits =
pendingSplitAssignment.remove(subtask);
+ if (splits != null && splits.size() > 0) {
+ context.assignSplits(new
SplitsAssignment<>(Collections.singletonMap(subtask, splits)));
} else {
context.signalNoMoreSplits(subtask);
}
@@ -71,7 +92,9 @@ public class StaticFileStoreSplitEnumerator
@Override
public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int
subtaskId) {
- splits.addAll(backSplits);
+ pendingSplitAssignment
+ .computeIfAbsent(subtaskId, k -> new ArrayList<>())
+ .addAll(backSplits);
}
@Override
@@ -81,8 +104,9 @@ public class StaticFileStoreSplitEnumerator
@Override
public PendingSplitsCheckpoint snapshotState(long checkpointId) {
- return new PendingSplitsCheckpoint(
- new ArrayList<>(splits), snapshot == null ? null :
snapshot.id());
+ List<FileStoreSourceSplit> splits = new ArrayList<>();
+ pendingSplitAssignment.values().forEach(splits::addAll);
+ return new PendingSplitsCheckpoint(splits, snapshot == null ? null :
snapshot.id());
}
@Override
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
index d74f3d5b..881e7753 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
@@ -136,7 +136,7 @@ public class ContinuousFileSplitEnumeratorTest {
assertThat(assignedSplits).hasSameElementsAs(expectedSplits.subList(2,
4));
}
- private static FileStoreSourceSplit createSnapshotSplit(
+ public static FileStoreSourceSplit createSnapshotSplit(
int snapshotId, int bucket, List<DataFileMeta> files) {
return new FileStoreSourceSplit(
UUID.randomUUID().toString(),
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
new file mode 100644
index 00000000..04ceee51
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState;
+import static
org.apache.flink.table.store.connector.source.ContinuousFileSplitEnumeratorTest.createSnapshotSplit;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */
+public class StaticFileStoreSplitEnumeratorTest {
+
+ @Test
+ public void testSplitAllocation() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(2);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+
+ List<FileStoreSourceSplit> splits = new ArrayList<>();
+ for (int i = 1; i <= 4; i++) {
+ splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+ }
+ StaticFileStoreSplitEnumerator enumerator =
+ new StaticFileStoreSplitEnumerator(context, null, splits);
+
+ // test assign
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(1, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1);
+ assertThat(assignments.get(0).getAssignedSplits())
+ .containsExactly(splits.get(0), splits.get(2));
+ assertThat(assignments.get(1).getAssignedSplits())
+ .containsExactly(splits.get(1), splits.get(3));
+
+ // test addSplitsBack
+ enumerator.addSplitsBack(assignments.get(0).getAssignedSplits(), 0);
+ context.getSplitAssignments().clear();
+ assertThat(context.getSplitAssignments()).isEmpty();
+ enumerator.handleSplitRequest(0, "test-host");
+ assertThat(assignments.get(0).getAssignedSplits())
+ .containsExactly(splits.get(0), splits.get(2));
+ }
+
+ @Test
+ public void testSplitAllocationNotEvenly() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(2);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+
+ List<FileStoreSourceSplit> splits = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+ }
+ StaticFileStoreSplitEnumerator enumerator =
+ new StaticFileStoreSplitEnumerator(context, null, splits);
+
+ // test assign
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(1, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1);
+ assertThat(assignments.get(0).getAssignedSplits())
+ .containsExactly(splits.get(0), splits.get(2));
+
assertThat(assignments.get(1).getAssignedSplits()).containsExactly(splits.get(1));
+ }
+
+ @Test
+ public void testSplitAllocationSomeEmpty() {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ new TestingSplitEnumeratorContext<>(3);
+ context.registerReader(0, "test-host");
+ context.registerReader(1, "test-host");
+ context.registerReader(2, "test-host");
+
+ List<FileStoreSourceSplit> splits = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+ }
+ StaticFileStoreSplitEnumerator enumerator =
+ new StaticFileStoreSplitEnumerator(context, null, splits);
+
+ // test assign
+ enumerator.handleSplitRequest(0, "test-host");
+ enumerator.handleSplitRequest(1, "test-host");
+ enumerator.handleSplitRequest(2, "test-host");
+ Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+ context.getSplitAssignments();
+ assertThat(assignments).containsOnlyKeys(0, 1, 2);
+
assertThat(assignments.get(0).getAssignedSplits()).containsExactly(splits.get(0));
+
assertThat(assignments.get(1).getAssignedSplits()).containsExactly(splits.get(1));
+ assertThat(assignments.get(2).getAssignedSplits()).isEmpty();
+ }
+}