This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 4cf7ef92021202cb101fa19bca915bcba98ed71c
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jan 26 18:27:45 2022 +0800

    [FLINK-25820] Introduce StaticFileStoreSplitEnumerator
---
 flink-table-store-connector/pom.xml                |  28 ++++++
 .../source/StaticFileStoreSplitEnumerator.java     |  93 ++++++++++++++++++
 .../source/StaticFileStoreSplitEnumeratorTest.java | 108 +++++++++++++++++++++
 3 files changed, 229 insertions(+)

diff --git a/flink-table-store-connector/pom.xml 
b/flink-table-store-connector/pom.xml
index 896a793..a578b80 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -85,6 +85,34 @@ under the License.
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.junit.vintage</groupId>
+                    <artifactId>junit-vintage-engine</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-avro</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
new file mode 100644
index 0000000..6a884b2
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/** A SplitEnumerator implementation for bounded / batch {@link 
FileStoreSource} input. */
+public class StaticFileStoreSplitEnumerator
+        implements SplitEnumerator<FileStoreSourceSplit, 
PendingSplitsCheckpoint> {
+
+    private final SplitEnumeratorContext<FileStoreSourceSplit> context;
+
+    private final Queue<FileStoreSourceSplit> splits;
+
+    public StaticFileStoreSplitEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            Collection<FileStoreSourceSplit> splits) {
+        this.context = context;
+        this.splits = new LinkedList<>(splits);
+    }
+
+    public StaticFileStoreSplitEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context, 
FileStoreScan scan) {
+        this.context = context;
+        this.splits = new LinkedList<>(new 
FileStoreSourceSplitGenerator().createSplits(scan));
+    }
+
+    @Override
+    public void start() {
+        // no resources to start
+    }
+
+    @Override
+    public void handleSplitRequest(int subtask, @Nullable String hostname) {
+        if (!context.registeredReaders().containsKey(subtask)) {
+            // reader failed between sending the request and now. skip this 
request.
+            return;
+        }
+
+        FileStoreSourceSplit split = splits.poll();
+        if (split != null) {
+            context.assignSplit(split, subtask);
+        } else {
+            context.signalNoMoreSplits(subtask);
+        }
+    }
+
+    @Override
+    public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int 
subtaskId) {
+        splits.addAll(backSplits);
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        // this source is purely lazy-pull-based, nothing to do upon 
registration
+    }
+
+    @Override
+    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
+        return PendingSplitsCheckpoint.fromStatic(new ArrayList<>(splits));
+    }
+
+    @Override
+    public void close() {
+        // no resources to close
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
new file mode 100644
index 0000000..96c6bd6
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
+import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */
+public class StaticFileStoreSplitEnumeratorTest {
+
+    @Test
+    public void testCheckpointNoSplitRequested() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+        final FileStoreSourceSplit split = createRandomSplit();
+        final StaticFileStoreSplitEnumerator enumerator = 
createEnumerator(context, split);
+
+        final PendingSplitsCheckpoint checkpoint = 
enumerator.snapshotState(1L);
+
+        assertThat(checkpoint.splits()).contains(split);
+    }
+
+    @Test
+    public void testSplitRequestForRegisteredReader() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+        final FileStoreSourceSplit split = createRandomSplit();
+        final StaticFileStoreSplitEnumerator enumerator = 
createEnumerator(context, split);
+
+        context.registerReader(3, "somehost");
+        enumerator.addReader(3);
+        enumerator.handleSplitRequest(3, "somehost");
+
+        assertThat(enumerator.snapshotState(1L).splits()).isEmpty();
+        
assertThat(context.getSplitAssignments().get(3).getAssignedSplits()).contains(split);
+    }
+
+    @Test
+    public void testSplitRequestForNonRegisteredReader() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+        final FileStoreSourceSplit split = createRandomSplit();
+        final StaticFileStoreSplitEnumerator enumerator = 
createEnumerator(context, split);
+
+        enumerator.handleSplitRequest(3, "somehost");
+
+        assertThat(context.getSplitAssignments().containsKey(3)).isFalse();
+        assertThat(enumerator.snapshotState(1L).splits()).contains(split);
+    }
+
+    @Test
+    public void testNoMoreSplits() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(4);
+        final FileStoreSourceSplit split = createRandomSplit();
+        final StaticFileStoreSplitEnumerator enumerator = 
createEnumerator(context, split);
+
+        // first split assignment
+        context.registerReader(1, "somehost");
+        enumerator.addReader(1);
+        enumerator.handleSplitRequest(1, "somehost");
+
+        // second request has no more split
+        enumerator.handleSplitRequest(1, "somehost");
+
+        
assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split);
+        
assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue();
+    }
+
+    // ------------------------------------------------------------------------
+    //  test setup helpers
+    // ------------------------------------------------------------------------
+
+    private static FileStoreSourceSplit createRandomSplit() {
+        return new FileStoreSourceSplit("split", row(1), 2, 
Arrays.asList(newFile(0), newFile(1)));
+    }
+
+    private static StaticFileStoreSplitEnumerator createEnumerator(
+            final SplitEnumeratorContext<FileStoreSourceSplit> context,
+            final FileStoreSourceSplit... splits) {
+
+        return new StaticFileStoreSplitEnumerator(context, 
Arrays.asList(splits));
+    }
+}

Reply via email to