This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 4cf7ef92021202cb101fa19bca915bcba98ed71c Author: JingsongLi <[email protected]> AuthorDate: Wed Jan 26 18:27:45 2022 +0800 [FLINK-25820] Introduce StaticFileStoreSplitEnumerator --- flink-table-store-connector/pom.xml | 28 ++++++ .../source/StaticFileStoreSplitEnumerator.java | 93 ++++++++++++++++++ .../source/StaticFileStoreSplitEnumeratorTest.java | 108 +++++++++++++++++++++ 3 files changed, 229 insertions(+) diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml index 896a793..a578b80 100644 --- a/flink-table-store-connector/pom.xml +++ b/flink-table-store-connector/pom.xml @@ -85,6 +85,34 @@ under the License. <scope>test</scope> <type>test-jar</type> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + <exclusion> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java new file mode 100644 index 0000000..6a884b2 --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.table.store.file.operation.FileStoreScan; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +/** A SplitEnumerator implementation for bounded / batch {@link FileStoreSource} input. */ +public class StaticFileStoreSplitEnumerator + implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> { + + private final SplitEnumeratorContext<FileStoreSourceSplit> context; + + private final Queue<FileStoreSourceSplit> splits; + + public StaticFileStoreSplitEnumerator( + SplitEnumeratorContext<FileStoreSourceSplit> context, + Collection<FileStoreSourceSplit> splits) { + this.context = context; + this.splits = new LinkedList<>(splits); + } + + public StaticFileStoreSplitEnumerator( + SplitEnumeratorContext<FileStoreSourceSplit> context, FileStoreScan scan) { + this.context = context; + this.splits = new LinkedList<>(new FileStoreSourceSplitGenerator().createSplits(scan)); + } + + @Override + public void start() { + // no resources to start + } + + @Override + public void handleSplitRequest(int subtask, @Nullable String hostname) { + if (!context.registeredReaders().containsKey(subtask)) { + // reader failed between sending the request and now. skip this request. + return; + } + + FileStoreSourceSplit split = splits.poll(); + if (split != null) { + context.assignSplit(split, subtask); + } else { + context.signalNoMoreSplits(subtask); + } + } + + @Override + public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) { + splits.addAll(backSplits); + } + + @Override + public void addReader(int subtaskId) { + // this source is purely lazy-pull-based, nothing to do upon registration + } + + @Override + public PendingSplitsCheckpoint snapshotState(long checkpointId) { + return PendingSplitsCheckpoint.fromStatic(new ArrayList<>(splits)); + } + + @Override + public void close() { + // no resources to close + } +} diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java new file mode 100644 index 0000000..96c6bd6 --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile; +import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */ +public class StaticFileStoreSplitEnumeratorTest { + + @Test + public void testCheckpointNoSplitRequested() { + final TestingSplitEnumeratorContext<FileStoreSourceSplit> context = + new TestingSplitEnumeratorContext<>(4); + final FileStoreSourceSplit split = createRandomSplit(); + final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split); + + final PendingSplitsCheckpoint checkpoint = enumerator.snapshotState(1L); + + assertThat(checkpoint.splits()).contains(split); + } + + @Test + public void testSplitRequestForRegisteredReader() { + final TestingSplitEnumeratorContext<FileStoreSourceSplit> context = + new TestingSplitEnumeratorContext<>(4); + final FileStoreSourceSplit split = createRandomSplit(); + final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split); + + context.registerReader(3, "somehost"); + enumerator.addReader(3); + enumerator.handleSplitRequest(3, "somehost"); + + assertThat(enumerator.snapshotState(1L).splits()).isEmpty(); + assertThat(context.getSplitAssignments().get(3).getAssignedSplits()).contains(split); + } + + @Test + public void testSplitRequestForNonRegisteredReader() { + final TestingSplitEnumeratorContext<FileStoreSourceSplit> context = + new TestingSplitEnumeratorContext<>(4); + final FileStoreSourceSplit split = createRandomSplit(); + final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split); + + enumerator.handleSplitRequest(3, "somehost"); + + assertThat(context.getSplitAssignments().containsKey(3)).isFalse(); + assertThat(enumerator.snapshotState(1L).splits()).contains(split); + } + + @Test + public void testNoMoreSplits() { + final TestingSplitEnumeratorContext<FileStoreSourceSplit> context = + new TestingSplitEnumeratorContext<>(4); + final FileStoreSourceSplit split = createRandomSplit(); + final StaticFileStoreSplitEnumerator enumerator = createEnumerator(context, split); + + // first split assignment + context.registerReader(1, "somehost"); + enumerator.addReader(1); + enumerator.handleSplitRequest(1, "somehost"); + + // second request has no more split + enumerator.handleSplitRequest(1, "somehost"); + + assertThat(context.getSplitAssignments().get(1).getAssignedSplits()).contains(split); + assertThat(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal()).isTrue(); + } + + // ------------------------------------------------------------------------ + // test setup helpers + // ------------------------------------------------------------------------ + + private static FileStoreSourceSplit createRandomSplit() { + return new FileStoreSourceSplit("split", row(1), 2, Arrays.asList(newFile(0), newFile(1))); + } + + private static StaticFileStoreSplitEnumerator createEnumerator( + final SplitEnumeratorContext<FileStoreSourceSplit> context, + final FileStoreSourceSplit... splits) { + + return new StaticFileStoreSplitEnumerator(context, Arrays.asList(splits)); + } +}
