This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 1601a3a2779fa2d6ffb2efb8de1d89fe5f4ca1d9 Author: JingsongLi <[email protected]> AuthorDate: Wed Jan 26 18:29:02 2022 +0800 [FLINK-25820] Introduce FileStoreSourceReader --- .../connector/source/FileStoreSourceReader.java | 73 ++++++++++++++++++++++ .../source/FileStoreSourceReaderTest.java | 68 ++++++++++++++++++++ 2 files changed, 141 insertions(+) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java new file mode 100644 index 0000000..d6a1aef --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.file.src.util.RecordAndPosition; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.operation.FileStoreRead; + +import java.util.Map; + +/** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */ +public final class FileStoreSourceReader + extends SingleThreadMultiplexSourceReaderBase< + RecordAndPosition<RowData>, + RowData, + FileStoreSourceSplit, + FileStoreSourceSplitState> { + + public FileStoreSourceReader( + SourceReaderContext readerContext, FileStoreRead fileStoreRead, boolean keyAsRecord) { + super( + () -> new FileStoreSourceSplitReader(fileStoreRead, keyAsRecord), + (element, output, splitState) -> { + output.collect(element.getRecord()); + splitState.setPosition(element); + }, + readerContext.getConfiguration(), + readerContext); + } + + @Override + public void start() { + // we request a split only if we did not get splits during the checkpoint restore + if (getNumberOfCurrentlyAssignedSplits() == 0) { + context.sendSplitRequest(); + } + } + + @Override + protected void onSplitFinished(Map<String, FileStoreSourceSplitState> finishedSplitIds) { + context.sendSplitRequest(); + } + + @Override + protected FileStoreSourceSplitState initializedState(FileStoreSourceSplit split) { + return new FileStoreSourceSplitState(split); + } + + @Override + protected FileStoreSourceSplit toSplitType( + String splitId, FileStoreSourceSplitState splitState) { + return splitState.toSourceSplit(); + } +} diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java new file mode 100644 index 0000000..60abfc8 --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.core.fs.Path; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Collections; + +import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the {@link FileStoreSourceReader}. */ +public class FileStoreSourceReaderTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testRequestSplitWhenNoSplitRestored() throws Exception { + final TestingReaderContext context = new TestingReaderContext(); + final FileStoreSourceReader reader = createReader(context); + + reader.start(); + reader.close(); + + assertThat(context.getNumSplitRequests()).isEqualTo(1); + } + + @Test + public void testNoSplitRequestWhenSplitRestored() throws Exception { + final TestingReaderContext context = new TestingReaderContext(); + final FileStoreSourceReader reader = createReader(context); + + reader.addSplits(Collections.singletonList(createTestFileSplit())); + reader.start(); + reader.close(); + + assertThat(context.getNumSplitRequests()).isEqualTo(0); + } + + private FileStoreSourceReader createReader(TestingReaderContext context) { + return new FileStoreSourceReader( + context, new TestFileStoreRead(new Path(tempDir.toUri()), null), false); + } + + private static FileStoreSourceSplit createTestFileSplit() { + return new FileStoreSourceSplit("id1", row(1), 0, Collections.emptyList()); + } +}
