This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 9cf4f76b736000a2ccd31a937551c0bca1c2e557 Author: fangyong <[email protected]> AuthorDate: Tue Sep 27 13:03:34 2022 +0800 [FLINK-28256] Move the write and prepareCommit logic of AbstractTableWrite to FileStoreWrite --- .../file/operation/AbstractFileStoreWrite.java | 100 +++++++++++++ .../table/store/file/operation/FileStoreWrite.java | 39 ++++- .../file/operation/KeyValueFileStoreWrite.java | 4 +- .../operation/MemoryFileStoreWrite.java} | 19 +-- .../store/table/AppendOnlyFileStoreTable.java | 31 ++-- .../table/ChangelogValueCountFileStoreTable.java | 61 ++++---- .../table/ChangelogWithKeyFileStoreTable.java | 69 +++++---- .../table/store/table/sink/AbstractTableWrite.java | 166 --------------------- .../table/store/table/sink/TableWriteImpl.java | 80 ++++++++++ .../store/table/sink/WriteRecordConverter.java | 43 ++++++ 10 files changed, 369 insertions(+), 243 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java index c26ab955..50334778 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java @@ -22,14 +22,23 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.file.io.DataFileMeta; import org.apache.flink.table.store.file.manifest.ManifestEntry; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.file.utils.SnapshotManager; +import org.apache.flink.table.store.table.sink.FileCommittable; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Base {@link FileStoreWrite} implementation. @@ -43,9 +52,19 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { @Nullable protected IOManager ioManager; + protected final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers; + private final ExecutorService compactExecutor; + + private boolean overwrite = false; + protected AbstractFileStoreWrite(SnapshotManager snapshotManager, FileStoreScan scan) { this.snapshotManager = snapshotManager; this.scan = scan; + + this.writers = new HashMap<>(); + this.compactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("compaction-thread")); } @Override @@ -67,4 +86,85 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { } return existingFileMetas; } + + public void withOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } + + @Override + public void write(BinaryRowData partition, int bucket, T data) throws Exception { + RecordWriter<T> writer = getWriter(partition, bucket); + writer.write(data); + } + + public List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception { + List<FileCommittable> result = new ArrayList<>(); + + Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> partIter = + writers.entrySet().iterator(); + while (partIter.hasNext()) { + Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>> partEntry = partIter.next(); + BinaryRowData partition = partEntry.getKey(); + Iterator<Map.Entry<Integer, RecordWriter<T>>> bucketIter = + partEntry.getValue().entrySet().iterator(); + while (bucketIter.hasNext()) { + Map.Entry<Integer, RecordWriter<T>> entry = bucketIter.next(); + int bucket = entry.getKey(); + RecordWriter<T> writer = entry.getValue(); + RecordWriter.CommitIncrement increment = writer.prepareCommit(endOfInput); + FileCommittable committable = + new FileCommittable( + partition, + bucket, + increment.newFilesIncrement(), + increment.compactIncrement()); + result.add(committable); + + // clear if no update + // we need a mechanism to clear writers, otherwise there will be more and more + // such as yesterday's partition that no longer needs to be written. + if (committable.isEmpty()) { + writer.close(); + bucketIter.remove(); + } + } + + if (partEntry.getValue().isEmpty()) { + partIter.remove(); + } + } + + return result; + } + + @Override + public void close() throws Exception { + for (Map<Integer, RecordWriter<T>> bucketWriters : writers.values()) { + for (RecordWriter<T> writer : bucketWriters.values()) { + writer.close(); + } + } + writers.clear(); + compactExecutor.shutdownNow(); + } + + private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) { + Map<Integer, RecordWriter<T>> buckets = writers.get(partition); + if (buckets == null) { + buckets = new HashMap<>(); + writers.put(partition.copy(), buckets); + } + return buckets.computeIfAbsent(bucket, k -> createWriter(partition.copy(), bucket)); + } + + private RecordWriter<T> createWriter(BinaryRowData partition, int bucket) { + RecordWriter<T> writer = + overwrite + ? createEmptyWriter(partition.copy(), bucket, compactExecutor) + : createWriter(partition.copy(), bucket, compactExecutor); + notifyNewWriter(writer); + return writer; + } + + protected void notifyNewWriter(RecordWriter<T> writer) {} } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java index 553ad3b6..85ecdcb7 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java @@ -20,9 +20,12 @@ package org.apache.flink.table.store.file.operation; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.FileStore; import org.apache.flink.table.store.file.compact.CompactResult; import org.apache.flink.table.store.file.io.DataFileMeta; import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.table.store.table.sink.FileCommittable; +import org.apache.flink.table.store.table.sink.SinkRecord; import javax.annotation.Nullable; @@ -31,7 +34,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; /** - * Write operation which provides {@link RecordWriter} creation. + * Write operation which provides {@link RecordWriter} creation and writes {@link SinkRecord} to + * {@link FileStore}. * * @param <T> type of record to write. */ @@ -55,4 +59,37 @@ public interface FileStoreWrite<T> { */ Callable<CompactResult> createCompactWriter( BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> compactFiles); + + /** + * If overwrite is true, the writer will overwrite the store, otherwise it won't. + * + * @param overwrite the overwrite flag + */ + void withOverwrite(boolean overwrite); + + /** + * Write the data to the store according to the partition and bucket. + * + * @param partition the partition of the data + * @param bucket the bucket id of the data + * @param data the given data + * @throws Exception the thrown exception when writing the record + */ + void write(BinaryRowData partition, int bucket, T data) throws Exception; + + /** + * Prepare commit in the write. + * + * @param endOfInput if true, the data writing is ended + * @return the file committable list + * @throws Exception the thrown exception + */ + List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception; + + /** + * Close the writer. + * + * @throws Exception the thrown exception + */ + void close() throws Exception; } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java index 5e79a499..5bfd1f19 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java @@ -58,7 +58,7 @@ import java.util.function.Supplier; import static org.apache.flink.table.store.file.io.DataFileMeta.getMaxSequenceNumber; /** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.KeyValueFileStore}. */ -public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { +public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { private final KeyValueFileReaderFactory.Builder readerFactoryBuilder; private final KeyValueFileWriterFactory.Builder writerFactoryBuilder; @@ -77,7 +77,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options) { - super(snapshotManager, scan); + super(snapshotManager, scan, options); this.readerFactoryBuilder = KeyValueFileReaderFactory.builder( schemaManager, diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java similarity index 83% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java index 06a217d7..84c08788 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.table.store.table.sink; +package org.apache.flink.table.store.file.operation; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool; import org.apache.flink.table.store.file.memory.MemoryOwner; import org.apache.flink.table.store.file.memory.MemoryPoolFactory; -import org.apache.flink.table.store.file.operation.FileStoreWrite; import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.table.store.file.utils.SnapshotManager; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators; @@ -31,16 +31,17 @@ import java.util.Iterator; import java.util.Map; /** - * A {@link TableWrite} which supports using shared memory and preempting memory from other writers. + * Base {@link FileStoreWrite} implementation which supports using shared memory and preempting + * memory from other writers. + * + * @param <T> type of record to write. */ -public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> { - +public abstract class MemoryFileStoreWrite<T> extends AbstractFileStoreWrite<T> { private final MemoryPoolFactory memoryPoolFactory; - protected MemoryTableWrite( - FileStoreWrite<T> write, SinkRecordConverter recordConverter, CoreOptions options) { - super(write, recordConverter); - + public MemoryFileStoreWrite( + SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options) { + super(snapshotManager, scan); HeapMemorySegmentPool memoryPool = new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()); this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, this::memoryOwners); diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java index 5cc90e27..a8a85755 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java @@ -30,11 +30,11 @@ import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; -import org.apache.flink.table.store.file.utils.RecordWriter; -import org.apache.flink.table.store.table.sink.AbstractTableWrite; import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.store.table.sink.SinkRecordConverter; import org.apache.flink.table.store.table.sink.TableWrite; +import org.apache.flink.table.store.table.sink.TableWriteImpl; +import org.apache.flink.table.store.table.sink.WriteRecordConverter; import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator; import org.apache.flink.table.store.table.source.Split; import org.apache.flink.table.store.table.source.SplitGenerator; @@ -108,21 +108,26 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable { public TableWrite newWrite() { SinkRecordConverter recordConverter = new SinkRecordConverter(store.options().bucket(), tableSchema); - return new AbstractTableWrite<RowData>(store.newWrite(), recordConverter) { - @Override - protected void writeSinkRecord(SinkRecord record, RecordWriter<RowData> writer) - throws Exception { - Preconditions.checkState( - record.row().getRowKind() == RowKind.INSERT, - "Append only writer can not accept row with RowKind %s", - record.row().getRowKind()); - writer.write(record.row()); - } - }; + return new TableWriteImpl<>( + store.newWrite(), recordConverter, new AppendOnlyWriteRecordConverter()); } @Override public AppendOnlyFileStore store() { return store; } + + /** {@link WriteRecordConverter} implementation in {@link AppendOnlyFileStore}. */ + private static class AppendOnlyWriteRecordConverter implements WriteRecordConverter<RowData> { + private static final long serialVersionUID = 1L; + + @Override + public RowData write(SinkRecord record) throws Exception { + Preconditions.checkState( + record.row().getRowKind() == RowKind.INSERT, + "Append only writer can not accept row with RowKind %s", + record.row().getRowKind()); + return record.row(); + } + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java index e2a4e8c9..1604b8e7 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java @@ -33,11 +33,11 @@ import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; -import org.apache.flink.table.store.file.utils.RecordWriter; -import org.apache.flink.table.store.table.sink.MemoryTableWrite; import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.store.table.sink.SinkRecordConverter; import org.apache.flink.table.store.table.sink.TableWrite; +import org.apache.flink.table.store.table.sink.TableWriteImpl; +import org.apache.flink.table.store.table.sink.WriteRecordConverter; import org.apache.flink.table.store.table.source.KeyValueTableRead; import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator; import org.apache.flink.table.store.table.source.SplitGenerator; @@ -122,33 +122,44 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable { public TableWrite newWrite() { SinkRecordConverter recordConverter = new SinkRecordConverter(store.options().bucket(), tableSchema); - return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) { - - private final KeyValue kv = new KeyValue(); - - @Override - protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer) - throws Exception { - switch (record.row().getRowKind()) { - case INSERT: - case UPDATE_AFTER: - kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(1L)); - break; - case UPDATE_BEFORE: - case DELETE: - kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(-1L)); - break; - default: - throw new UnsupportedOperationException( - "Unknown row kind " + record.row().getRowKind()); - } - writer.write(kv); - } - }; + return new TableWriteImpl<>( + store.newWrite(), recordConverter, new ChangelogValueCountWriteRecordConverter()); } @Override public KeyValueFileStore store() { return store; } + + /** + * {@link WriteRecordConverter} implementation for {@link ChangelogValueCountFileStoreTable}. + */ + private static class ChangelogValueCountWriteRecordConverter + implements WriteRecordConverter<KeyValue> { + private static final long serialVersionUID = 1L; + + private transient KeyValue kv; + + @Override + public KeyValue write(SinkRecord record) throws Exception { + if (kv == null) { + kv = new KeyValue(); + } + + switch (record.row().getRowKind()) { + case INSERT: + case UPDATE_AFTER: + kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(1L)); + break; + case UPDATE_BEFORE: + case DELETE: + kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(-1L)); + break; + default: + throw new UnsupportedOperationException( + "Unknown row kind " + record.row().getRowKind()); + } + return kv; + } + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java index ed464435..6785af4e 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java @@ -35,12 +35,12 @@ import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; -import org.apache.flink.table.store.file.utils.RecordWriter; -import org.apache.flink.table.store.table.sink.MemoryTableWrite; import org.apache.flink.table.store.table.sink.SequenceGenerator; import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.store.table.sink.SinkRecordConverter; import org.apache.flink.table.store.table.sink.TableWrite; +import org.apache.flink.table.store.table.sink.TableWriteImpl; +import org.apache.flink.table.store.table.sink.WriteRecordConverter; import org.apache.flink.table.store.table.source.KeyValueTableRead; import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator; import org.apache.flink.table.store.table.source.SplitGenerator; @@ -102,11 +102,12 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable { throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine); } + CoreOptions options = new CoreOptions(conf); this.store = new KeyValueFileStore( schemaManager, tableSchema.id(), - new CoreOptions(conf), + options, tableSchema.logicalPartitionType(), addKeyNamePrefix(tableSchema.logicalBucketKeyType()), addKeyNamePrefix(tableSchema.logicalTrimmedPrimaryKeysType()), @@ -190,34 +191,48 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable { public TableWrite newWrite() { SinkRecordConverter recordConverter = new SinkRecordConverter(store.options().bucket(), tableSchema); - SequenceGenerator sequenceGenerator = - store.options() - .sequenceField() - .map(field -> new SequenceGenerator(field, schema().logicalRowType())) - .orElse(null); - return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) { - - private final KeyValue kv = new KeyValue(); - - @Override - protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer) - throws Exception { - long sequenceNumber = - sequenceGenerator == null - ? KeyValue.UNKNOWN_SEQUENCE - : sequenceGenerator.generate(record.row()); - writer.write( - kv.replace( - record.primaryKey(), - sequenceNumber, - record.row().getRowKind(), - record.row())); - } - }; + return new TableWriteImpl<>( + store.newWrite(), + recordConverter, + new ChangelogWithKeyWriteRecordConverter(store.options(), schema())); } @Override public KeyValueFileStore store() { return store; } + + /** {@link WriteRecordConverter} implementation for {@link ChangelogWithKeyFileStoreTable}. */ + private static class ChangelogWithKeyWriteRecordConverter + implements WriteRecordConverter<KeyValue> { + private final CoreOptions options; + private final TableSchema schema; + private transient SequenceGenerator sequenceGenerator; + private transient KeyValue kv; + + private ChangelogWithKeyWriteRecordConverter(CoreOptions options, TableSchema schema) { + this.options = options; + this.schema = schema; + } + + @Override + public KeyValue write(SinkRecord record) throws Exception { + if (sequenceGenerator == null) { + sequenceGenerator = + options.sequenceField() + .map(field -> new SequenceGenerator(field, schema.logicalRowType())) + .orElse(null); + } + if (kv == null) { + kv = new KeyValue(); + } + + long sequenceNumber = + sequenceGenerator == null + ? KeyValue.UNKNOWN_SEQUENCE + : sequenceGenerator.generate(record.row()); + return kv.replace( + record.primaryKey(), sequenceNumber, record.row().getRowKind(), record.row()); + } + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java deleted file mode 100644 index 0371ae76..00000000 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.store.table.sink; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.store.file.operation.FileStoreWrite; -import org.apache.flink.table.store.file.utils.RecordWriter; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * Base {@link TableWrite} implementation. - * - * @param <T> type of record to write into {@link org.apache.flink.table.store.file.FileStore}. - */ -public abstract class AbstractTableWrite<T> implements TableWrite { - - private final FileStoreWrite<T> write; - private final SinkRecordConverter recordConverter; - - protected final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers; - private final ExecutorService compactExecutor; - - private boolean overwrite = false; - - protected AbstractTableWrite(FileStoreWrite<T> write, SinkRecordConverter recordConverter) { - this.write = write; - this.recordConverter = recordConverter; - - this.writers = new HashMap<>(); - this.compactExecutor = - Executors.newSingleThreadScheduledExecutor( - new ExecutorThreadFactory("compaction-thread")); - } - - @Override - public TableWrite withOverwrite(boolean overwrite) { - this.overwrite = overwrite; - return this; - } - - @Override - public TableWrite withIOManager(IOManager ioManager) { - this.write.withIOManager(ioManager); - return this; - } - - @Override - public SinkRecordConverter recordConverter() { - return recordConverter; - } - - @Override - public SinkRecord write(RowData rowData) throws Exception { - SinkRecord record = recordConverter.convert(rowData); - RecordWriter<T> writer = getWriter(record.partition(), record.bucket()); - writeSinkRecord(record, writer); - return record; - } - - @Override - public List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception { - List<FileCommittable> result = new ArrayList<>(); - - Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> partIter = - writers.entrySet().iterator(); - while (partIter.hasNext()) { - Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>> partEntry = partIter.next(); - BinaryRowData partition = partEntry.getKey(); - Iterator<Map.Entry<Integer, RecordWriter<T>>> bucketIter = - partEntry.getValue().entrySet().iterator(); - while (bucketIter.hasNext()) { - Map.Entry<Integer, RecordWriter<T>> entry = bucketIter.next(); - int bucket = entry.getKey(); - RecordWriter<T> writer = entry.getValue(); - RecordWriter.CommitIncrement increment = writer.prepareCommit(endOfInput); - FileCommittable committable = - new FileCommittable( - partition, - bucket, - increment.newFilesIncrement(), - increment.compactIncrement()); - result.add(committable); - - // clear if no update - // we need a mechanism to clear writers, otherwise there will be more and more - // such as yesterday's partition that no longer needs to be written. - if (committable.isEmpty()) { - writer.close(); - bucketIter.remove(); - } - } - - if (partEntry.getValue().isEmpty()) { - partIter.remove(); - } - } - - return result; - } - - @Override - public void close() throws Exception { - for (Map<Integer, RecordWriter<T>> bucketWriters : writers.values()) { - for (RecordWriter<T> writer : bucketWriters.values()) { - writer.close(); - } - } - writers.clear(); - compactExecutor.shutdownNow(); - } - - @VisibleForTesting - public Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers() { - return writers; - } - - protected abstract void writeSinkRecord(SinkRecord record, RecordWriter<T> writer) - throws Exception; - - private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) { - Map<Integer, RecordWriter<T>> buckets = writers.get(partition); - if (buckets == null) { - buckets = new HashMap<>(); - writers.put(partition.copy(), buckets); - } - return buckets.computeIfAbsent(bucket, k -> createWriter(partition.copy(), bucket)); - } - - private RecordWriter<T> createWriter(BinaryRowData partition, int bucket) { - RecordWriter<T> writer = - overwrite - ? write.createEmptyWriter(partition.copy(), bucket, compactExecutor) - : write.createWriter(partition.copy(), bucket, compactExecutor); - notifyNewWriter(writer); - return writer; - } - - protected void notifyNewWriter(RecordWriter<T> writer) {} -} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java new file mode 100644 index 00000000..389d2514 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.sink; + +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.operation.FileStoreWrite; + +import java.util.List; + +/** + * {@link TableWrite} implementation. + * + * @param <T> type of record to write into {@link org.apache.flink.table.store.file.FileStore}. + */ +public class TableWriteImpl<T> implements TableWrite { + + private final FileStoreWrite<T> write; + private final SinkRecordConverter recordConverter; + private final WriteRecordConverter<T> writeRecordConverter; + + public TableWriteImpl( + FileStoreWrite<T> write, + SinkRecordConverter recordConverter, + WriteRecordConverter<T> writeRecordConverter) { + this.write = write; + this.recordConverter = recordConverter; + this.writeRecordConverter = writeRecordConverter; + } + + @Override + public TableWrite withOverwrite(boolean overwrite) { + write.withOverwrite(overwrite); + return this; + } + + @Override + public TableWrite withIOManager(IOManager ioManager) { + write.withIOManager(ioManager); + return this; + } + + @Override + public SinkRecordConverter recordConverter() { + return recordConverter; + } + + @Override + public SinkRecord write(RowData rowData) throws Exception { + SinkRecord record = recordConverter.convert(rowData); + write.write(record.partition(), record.bucket(), writeRecordConverter.write(record)); + return record; + } + + @Override + public List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception { + return write.prepareCommit(endOfInput); + } + + @Override + public void close() throws Exception { + write.close(); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java new file mode 100644 index 00000000..134e0322 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.sink; + +import org.apache.flink.table.store.table.AppendOnlyFileStoreTable; +import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable; +import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable; + +import java.io.Serializable; + +/** + * Validate and convert the {@link SinkRecord} to the record supported in different store tables. + * + * @param <T> type of record in store table. + */ +public interface WriteRecordConverter<T> extends Serializable { + /** + * Validate and convert the {@link SinkRecord} to the record, operations in {@link + * AppendOnlyFileStoreTable}, {@link ChangelogValueCountFileStoreTable} and {@link + * ChangelogWithKeyFileStoreTable} are different. + * + * @param record the record to write + * @return The data in different store tables + * @throws Exception the thrown exception + */ + T write(SinkRecord record) throws Exception; +}
