This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit c94dfa8191bc39bf57e05b1e37840a3e0e4cd43d Author: JingsongLi <[email protected]> AuthorDate: Thu Jan 13 17:38:15 2022 +0800 [FLINK-25630] Introduce RecordWriter --- .../table/store/file/mergetree/Increment.java | 63 ++++++++++++++++++++++ .../flink/table/store/file/utils/RecordWriter.java | 57 ++++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java new file mode 100644 index 0000000..c8afffe --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree; + +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import java.util.Collections; +import java.util.List; + +/** + * Incremental files for merge tree. It consists of two parts: + * + * <ul> + * <li>New files: The new files generated in this snapshot cycle. They must be committed. + * <li>Compact files: The {@link #compactBefore} files are compacted to {@link #compactAfter} + * files in this snapshot cycle. The compaction is an optimization of files. + * </ul> + */ +public class Increment { + + private final List<SstFileMeta> newFiles; + + private final List<SstFileMeta> compactBefore; + + private final List<SstFileMeta> compactAfter; + + public Increment( + List<SstFileMeta> newFiles, + List<SstFileMeta> beCompacted, + List<SstFileMeta> compacted) { + this.newFiles = Collections.unmodifiableList(newFiles); + this.compactBefore = Collections.unmodifiableList(beCompacted); + this.compactAfter = Collections.unmodifiableList(compacted); + } + + public List<SstFileMeta> newFiles() { + return newFiles; + } + + public List<SstFileMeta> compactBefore() { + return compactBefore; + } + + public List<SstFileMeta> compactAfter() { + return compactAfter; + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java new file mode 100644 index 0000000..4047301 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.mergetree.Increment; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import java.util.List; + +/** + * The {@code RecordWriter} is responsible for writing data and handling in-progress files used to + * write yet un-staged data. The incremental files ready to commit is returned to the system by the + * {@link #prepareCommit()}. + */ +public interface RecordWriter { + + /** Add a key-value element to the writer. */ + void write(ValueKind valueKind, RowData key, RowData value) throws Exception; + + /** + * Prepare for a commit. + * + * @return Incremental files in this snapshot cycle + */ + Increment prepareCommit() throws Exception; + + /** + * Sync the writer. The structure related to file reading and writing is thread unsafe, there + * are asynchronous threads inside the writer, which should be synced before reading data. + */ + void sync() throws Exception; + + /** + * Close this writer, the call will delete newly generated but not committed files. + * + * @return Deleted files. + */ + List<SstFileMeta> close() throws Exception; +}
