This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit a7819f72ea94fce97552e5e4da56c91b92932929 Author: JingsongLi <[email protected]> AuthorDate: Wed Jan 26 18:28:31 2022 +0800 [FLINK-25820] Introduce FileStoreSourceSplitReader --- .../source/FileStoreSourceSplitReader.java | 203 +++++++++++++++ .../source/FileStoreSourceSplitReaderTest.java | 284 +++++++++++++++++++++ .../store/connector/source/TestFileStoreRead.java | 117 +++++++++ 3 files changed, 604 insertions(+) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java new file mode 100644 index 0000000..7db53e2 --- /dev/null +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.file.src.impl.FileRecords; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.connector.file.src.util.MutableRecordAndPosition; +import org.apache.flink.connector.file.src.util.Pool; +import org.apache.flink.connector.file.src.util.RecordAndPosition; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.operation.FileStoreRead; +import org.apache.flink.table.store.file.utils.RecordReader; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +/** The {@link SplitReader} implementation for the file store source. */ +public class FileStoreSourceSplitReader + implements SplitReader<RecordAndPosition<RowData>, FileStoreSourceSplit> { + + private final FileStoreRead fileStoreRead; + private final boolean keyAsRecord; + + private final Queue<FileStoreSourceSplit> splits; + + private final Pool<FileStoreRecordIterator> pool; + + @Nullable private RecordReader currentReader; + @Nullable private String currentSplitId; + private long currentNumRead; + private RecordReader.RecordIterator currentFirstBatch; + + public FileStoreSourceSplitReader(FileStoreRead fileStoreRead, boolean keyAsRecord) { + this.fileStoreRead = fileStoreRead; + this.keyAsRecord = keyAsRecord; + this.splits = new LinkedList<>(); + this.pool = new Pool<>(1); + this.pool.add(new FileStoreRecordIterator()); + } + + @Override + public RecordsWithSplitIds<RecordAndPosition<RowData>> fetch() throws IOException { + checkSplitOrStartNext(); + + // pool first, pool size is 1, the underlying implementation does not allow multiple batches + // to be read at the same time + FileStoreRecordIterator iterator = pool(); + + RecordReader.RecordIterator nextBatch; + if (currentFirstBatch != null) { + nextBatch = currentFirstBatch; + currentFirstBatch = null; + } else { + nextBatch = currentReader.readBatch(); + } + if (nextBatch == null) { + pool.recycler().recycle(iterator); + return finishSplit(); + } + return FileRecords.forRecords(currentSplitId, iterator.replace(nextBatch)); + } + + private FileStoreRecordIterator pool() throws IOException { + try { + return this.pool.pollEntry(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted"); + } + } + + @Override + public void handleSplitsChanges(SplitsChange<FileStoreSourceSplit> splitsChange) { + if (!(splitsChange instanceof SplitsAddition)) { + throw new UnsupportedOperationException( + String.format( + "The SplitChange type of %s is not supported.", + splitsChange.getClass())); + } + + splits.addAll(splitsChange.splits()); + } + + @Override + public void wakeUp() {} + + @Override + public void close() throws Exception { + if (currentReader != null) { + currentReader.close(); + } + } + + private void checkSplitOrStartNext() throws IOException { + if (currentReader != null) { + return; + } + + final FileStoreSourceSplit nextSplit = splits.poll(); + if (nextSplit == null) { + throw new IOException("Cannot fetch from another split - no split remaining"); + } + + currentSplitId = nextSplit.splitId(); + currentReader = + fileStoreRead.createReader( + nextSplit.partition(), nextSplit.bucket(), nextSplit.files()); + currentNumRead = nextSplit.recordsToSkip(); + if (currentNumRead > 0) { + seek(currentNumRead); + } + } + + private void seek(long toSkip) throws IOException { + while (true) { + RecordReader.RecordIterator nextBatch = currentReader.readBatch(); + if (nextBatch == null) { + throw new RuntimeException( + String.format( + "skip(%s) more than the number of remaining elements.", toSkip)); + } + while (toSkip > 0 && nextBatch.next() != null) { + toSkip--; + } + if (toSkip == 0) { + currentFirstBatch = nextBatch; + return; + } + nextBatch.releaseBatch(); + } + } + + private FileRecords<RowData> finishSplit() throws IOException { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + + final FileRecords<RowData> finishRecords = FileRecords.finishedSplit(currentSplitId); + currentSplitId = null; + return finishRecords; + } + + private class FileStoreRecordIterator implements BulkFormat.RecordIterator<RowData> { + + private RecordReader.RecordIterator iterator; + + private final MutableRecordAndPosition<RowData> recordAndPosition = + new MutableRecordAndPosition<>(); + + public FileStoreRecordIterator replace(RecordReader.RecordIterator iterator) { + this.iterator = iterator; + this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, currentNumRead); + return this; + } + + @Nullable + @Override + public RecordAndPosition<RowData> next() { + try { + KeyValue kv = iterator.next(); + if (kv == null) { + return null; + } + recordAndPosition.setNext(keyAsRecord ? kv.key() : kv.value()); + currentNumRead++; + return recordAndPosition; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void releaseBatch() { + this.iterator.releaseBatch(); + pool.recycler().recycle(this); + } + } +} diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java new file mode 100644 index 0000000..89068e6 --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.file.src.util.RecordAndPosition; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import scala.Tuple2; + +import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link FileStoreSourceSplitReader}. */ +public class FileStoreSourceSplitReaderTest { + + private static ExecutorService service; + + @TempDir java.nio.file.Path tempDir; + + private final AtomicInteger v = new AtomicInteger(0); + + @BeforeAll + public static void before() { + service = Executors.newSingleThreadExecutor(); + } + + @AfterAll + public static void after() { + service.shutdownNow(); + service = null; + } + + @Test + public void testKeyAsRecord() throws Exception { + innerTestOnce(true); + } + + @Test + public void testNonKeyAsRecord() throws Exception { + innerTestOnce(false); + } + + private void innerTestOnce(boolean keyAsRecord) throws Exception { + TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service); + FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, keyAsRecord); + + List<Tuple2<Integer, Integer>> input = kvs(); + List<SstFileMeta> files = read.writeFiles(row(1), 0, input); + + assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files)); + + RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch(); + assertRecords( + records, + null, + "id1", + 0, + input.stream() + .map(tuple2 -> keyAsRecord ? tuple2._1 : tuple2._2) + .collect(Collectors.toList())); + + records = reader.fetch(); + assertRecords(records, "id1", "id1", 0, null); + + reader.close(); + } + + @Test + public void testMultipleBatchInSplit() throws Exception { + TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service); + FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false); + + List<Tuple2<Integer, Integer>> input1 = kvs(); + List<SstFileMeta> files = read.writeFiles(row(1), 0, input1); + + List<Tuple2<Integer, Integer>> input2 = kvs(); + List<SstFileMeta> files2 = read.writeFiles(row(1), 0, input2); + files.addAll(files2); + + assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files)); + + RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch(); + assertRecords( + records, + null, + "id1", + 0, + input1.stream().map(Tuple2::_2).collect(Collectors.toList())); + + records = reader.fetch(); + assertRecords( + records, + null, + "id1", + 5, + input2.stream().map(Tuple2::_2).collect(Collectors.toList())); + + records = reader.fetch(); + assertRecords(records, "id1", "id1", 0, null); + + reader.close(); + } + + @Test + public void testRestore() throws Exception { + TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service); + FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false); + + List<Tuple2<Integer, Integer>> input = kvs(); + List<SstFileMeta> files = read.writeFiles(row(1), 0, input); + + assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 3)); + + RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch(); + assertRecords( + records, + null, + "id1", + 3, + input.subList(3, input.size()).stream() + .map(Tuple2::_2) + .collect(Collectors.toList())); + + records = reader.fetch(); + assertRecords(records, "id1", "id1", 0, null); + + reader.close(); + } + + @Test + public void testRestoreMultipleBatchInSplit() throws Exception { + TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service); + FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false); + + List<Tuple2<Integer, Integer>> input1 = kvs(); + List<SstFileMeta> files = read.writeFiles(row(1), 0, input1); + + List<Tuple2<Integer, Integer>> input2 = kvs(); + List<SstFileMeta> files2 = read.writeFiles(row(1), 0, input2); + files.addAll(files2); + + assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files, 7)); + + RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch(); + assertRecords( + records, + null, + "id1", + 7, + input2.subList(2, input2.size()).stream() + .map(Tuple2::_2) + .collect(Collectors.toList())); + + records = reader.fetch(); + assertRecords(records, "id1", "id1", 0, null); + + reader.close(); + } + + @Test + public void testMultipleSplits() throws Exception { + TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service); + FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false); + + List<Tuple2<Integer, Integer>> input1 = kvs(); + List<SstFileMeta> files1 = read.writeFiles(row(1), 0, input1); + assignSplit(reader, new FileStoreSourceSplit("id1", row(1), 0, files1)); + + List<Tuple2<Integer, Integer>> input2 = kvs(); + List<SstFileMeta> files2 = read.writeFiles(row(2), 1, input2); + assignSplit(reader, new FileStoreSourceSplit("id2", row(2), 1, files2)); + + RecordsWithSplitIds<RecordAndPosition<RowData>> records = reader.fetch(); + assertRecords( + records, + null, + "id1", + 0, + input1.stream().map(Tuple2::_2).collect(Collectors.toList())); + + records = reader.fetch(); + assertRecords(records, "id1", "id1", 0, null); + + records = reader.fetch(); + assertRecords( + records, + null, + "id2", + 0, + input2.stream().map(Tuple2::_2).collect(Collectors.toList())); + + records = reader.fetch(); + assertRecords(records, "id2", "id2", 0, null); + + reader.close(); + } + + @Test + public void testNoSplit() throws Exception { + TestFileStoreRead read = new TestFileStoreRead(new Path(tempDir.toUri()), service); + FileStoreSourceSplitReader reader = new FileStoreSourceSplitReader(read, false); + assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining"); + reader.close(); + } + + private void assertRecords( + RecordsWithSplitIds<RecordAndPosition<RowData>> records, + String finishedSplit, + String nextSplit, + long startRecordSkipCount, + List<Integer> expected) { + if (finishedSplit != null) { + assertThat(records.finishedSplits()).isEqualTo(Collections.singleton(finishedSplit)); + return; + } else { + assertThat(records.finishedSplits()).isEmpty(); + } + + assertThat(records.nextSplit()).isEqualTo(nextSplit); + + List<Integer> result = new ArrayList<>(); + RecordAndPosition<RowData> record; + while ((record = records.nextRecordFromSplit()) != null) { + result.add(record.getRecord().getInt(0)); + assertThat(record.getRecordSkipCount()).isEqualTo(++startRecordSkipCount); + } + records.recycle(); + + assertThat(result).isEqualTo(expected); + } + + private List<Tuple2<Integer, Integer>> kvs() { + List<Tuple2<Integer, Integer>> kvs = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + kvs.add(new Tuple2<>(next(), next())); + } + return kvs; + } + + private int next() { + return v.incrementAndGet(); + } + + private void assignSplit(FileStoreSourceSplitReader reader, FileStoreSourceSplit split) { + SplitsChange<FileStoreSourceSplit> splitsChange = + new SplitsAddition<>(Collections.singletonList(split)); + reader.handleSplitsChanges(splitsChange); + } +} diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java new file mode 100644 index 0000000..143d446 --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestFileStoreRead.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.FileFormat; +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.mergetree.MergeTree; +import org.apache.flink.table.store.file.mergetree.MergeTreeOptions; +import org.apache.flink.table.store.file.mergetree.SortedRun; +import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator; +import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition; +import org.apache.flink.table.store.file.mergetree.sst.SstFile; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.table.store.file.operation.FileStoreRead; +import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import scala.Tuple2; + +import static java.util.Collections.singletonList; + +/** + * Test {@link FileStoreRead}. + * + * <p>TODO: remove this, use FileStoreReadImpl. + */ +public class TestFileStoreRead implements FileStoreRead { + + private static final Comparator<RowData> COMPARATOR = Comparator.comparingInt(o -> o.getInt(0)); + + private final FileStorePathFactory pathFactory; + + private final ExecutorService service; + + public TestFileStoreRead(Path root, ExecutorService service) { + this.pathFactory = new FileStorePathFactory(root, RowType.of(new IntType()), "default"); + this.service = service; + } + + @Override + public void withKeyProjection(int[][] projectedFields) { + throw new UnsupportedOperationException(); + } + + @Override + public void withValueProjection(int[][] projectedFields) { + throw new UnsupportedOperationException(); + } + + @Override + public RecordReader createReader(BinaryRowData partition, int bucket, List<SstFileMeta> files) + throws IOException { + MergeTree mergeTree = createMergeTree(partition, bucket); + List<List<SortedRun>> runs = new IntervalPartition(files, COMPARATOR).partition(); + return mergeTree.createReader(runs, true); + } + + private MergeTree createMergeTree(BinaryRowData partition, int bucket) { + MergeTreeOptions options = new MergeTreeOptions(new Configuration()); + SstFile sstFile = + new SstFile.Factory( + new RowType( + singletonList(new RowType.RowField("k", new IntType()))), + new RowType( + singletonList(new RowType.RowField("v", new IntType()))), + FileFormat.fromIdentifier( + Thread.currentThread().getContextClassLoader(), + "avro", + new Configuration()), + pathFactory, + options.targetFileSize) + .create(partition, bucket); + return new MergeTree(options, sstFile, COMPARATOR, service, new DeduplicateAccumulator()); + } + + public List<SstFileMeta> writeFiles( + BinaryRowData partition, int bucket, List<Tuple2<Integer, Integer>> kvs) + throws Exception { + RecordWriter writer = createMergeTree(partition, bucket).createWriter(new ArrayList<>()); + for (Tuple2<Integer, Integer> tuple2 : kvs) { + writer.write(ValueKind.ADD, GenericRowData.of(tuple2._1), GenericRowData.of(tuple2._2)); + } + List<SstFileMeta> files = writer.prepareCommit().newFiles(); + writer.close(); + return new ArrayList<>(files); + } +}
