This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 9cf4f76b736000a2ccd31a937551c0bca1c2e557
Author: fangyong <[email protected]>
AuthorDate: Tue Sep 27 13:03:34 2022 +0800

    [FLINK-28256] Move the write and prepareCommit logic of AbstractTableWrite 
to FileStoreWrite
---
 .../file/operation/AbstractFileStoreWrite.java     | 100 +++++++++++++
 .../table/store/file/operation/FileStoreWrite.java |  39 ++++-
 .../file/operation/KeyValueFileStoreWrite.java     |   4 +-
 .../operation/MemoryFileStoreWrite.java}           |  19 +--
 .../store/table/AppendOnlyFileStoreTable.java      |  31 ++--
 .../table/ChangelogValueCountFileStoreTable.java   |  61 ++++----
 .../table/ChangelogWithKeyFileStoreTable.java      |  69 +++++----
 .../table/store/table/sink/AbstractTableWrite.java | 166 ---------------------
 .../table/store/table/sink/TableWriteImpl.java     |  80 ++++++++++
 .../store/table/sink/WriteRecordConverter.java     |  43 ++++++
 10 files changed, 369 insertions(+), 243 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index c26ab955..50334778 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -22,14 +22,23 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Base {@link FileStoreWrite} implementation.
@@ -43,9 +52,19 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
 
     @Nullable protected IOManager ioManager;
 
+    protected final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
+    private final ExecutorService compactExecutor;
+
+    private boolean overwrite = false;
+
     protected AbstractFileStoreWrite(SnapshotManager snapshotManager, 
FileStoreScan scan) {
         this.snapshotManager = snapshotManager;
         this.scan = scan;
+
+        this.writers = new HashMap<>();
+        this.compactExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory("compaction-thread"));
     }
 
     @Override
@@ -67,4 +86,85 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         }
         return existingFileMetas;
     }
+
+    public void withOverwrite(boolean overwrite) {
+        this.overwrite = overwrite;
+    }
+
+    @Override
+    public void write(BinaryRowData partition, int bucket, T data) throws 
Exception {
+        RecordWriter<T> writer = getWriter(partition, bucket);
+        writer.write(data);
+    }
+
+    public List<FileCommittable> prepareCommit(boolean endOfInput) throws 
Exception {
+        List<FileCommittable> result = new ArrayList<>();
+
+        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> 
partIter =
+                writers.entrySet().iterator();
+        while (partIter.hasNext()) {
+            Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>> partEntry 
= partIter.next();
+            BinaryRowData partition = partEntry.getKey();
+            Iterator<Map.Entry<Integer, RecordWriter<T>>> bucketIter =
+                    partEntry.getValue().entrySet().iterator();
+            while (bucketIter.hasNext()) {
+                Map.Entry<Integer, RecordWriter<T>> entry = bucketIter.next();
+                int bucket = entry.getKey();
+                RecordWriter<T> writer = entry.getValue();
+                RecordWriter.CommitIncrement increment = 
writer.prepareCommit(endOfInput);
+                FileCommittable committable =
+                        new FileCommittable(
+                                partition,
+                                bucket,
+                                increment.newFilesIncrement(),
+                                increment.compactIncrement());
+                result.add(committable);
+
+                // clear if no update
+                // we need a mechanism to clear writers, otherwise there will 
be more and more
+                // such as yesterday's partition that no longer needs to be 
written.
+                if (committable.isEmpty()) {
+                    writer.close();
+                    bucketIter.remove();
+                }
+            }
+
+            if (partEntry.getValue().isEmpty()) {
+                partIter.remove();
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public void close() throws Exception {
+        for (Map<Integer, RecordWriter<T>> bucketWriters : writers.values()) {
+            for (RecordWriter<T> writer : bucketWriters.values()) {
+                writer.close();
+            }
+        }
+        writers.clear();
+        compactExecutor.shutdownNow();
+    }
+
+    private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
+        Map<Integer, RecordWriter<T>> buckets = writers.get(partition);
+        if (buckets == null) {
+            buckets = new HashMap<>();
+            writers.put(partition.copy(), buckets);
+        }
+        return buckets.computeIfAbsent(bucket, k -> 
createWriter(partition.copy(), bucket));
+    }
+
+    private RecordWriter<T> createWriter(BinaryRowData partition, int bucket) {
+        RecordWriter<T> writer =
+                overwrite
+                        ? createEmptyWriter(partition.copy(), bucket, 
compactExecutor)
+                        : createWriter(partition.copy(), bucket, 
compactExecutor);
+        notifyNewWriter(writer);
+        return writer;
+    }
+
+    protected void notifyNewWriter(RecordWriter<T> writer) {}
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 553ad3b6..85ecdcb7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -20,9 +20,12 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.SinkRecord;
 
 import javax.annotation.Nullable;
 
@@ -31,7 +34,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 /**
- * Write operation which provides {@link RecordWriter} creation.
+ * Write operation which provides {@link RecordWriter} creation and writes 
{@link SinkRecord} to
+ * {@link FileStore}.
  *
  * @param <T> type of record to write.
  */
@@ -55,4 +59,37 @@ public interface FileStoreWrite<T> {
      */
     Callable<CompactResult> createCompactWriter(
             BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> 
compactFiles);
+
+    /**
+     * If overwrite is true, the writer will overwrite the store, otherwise it 
won't.
+     *
+     * @param overwrite the overwrite flag
+     */
+    void withOverwrite(boolean overwrite);
+
+    /**
+     * Write the data to the store according to the partition and bucket.
+     *
+     * @param partition the partition of the data
+     * @param bucket the bucket id of the data
+     * @param data the given data
+     * @throws Exception the thrown exception when writing the record
+     */
+    void write(BinaryRowData partition, int bucket, T data) throws Exception;
+
+    /**
+     * Prepare commit in the write.
+     *
+     * @param endOfInput if true, the data writing is ended
+     * @return the file committable list
+     * @throws Exception the thrown exception
+     */
+    List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception;
+
+    /**
+     * Close the writer.
+     *
+     * @throws Exception the thrown exception
+     */
+    void close() throws Exception;
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 5e79a499..5bfd1f19 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -58,7 +58,7 @@ import java.util.function.Supplier;
 import static 
org.apache.flink.table.store.file.io.DataFileMeta.getMaxSequenceNumber;
 
 /** {@link FileStoreWrite} for {@link 
org.apache.flink.table.store.file.KeyValueFileStore}. */
-public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
+public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
 
     private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
     private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
@@ -77,7 +77,7 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options) {
-        super(snapshotManager, scan);
+        super(snapshotManager, scan, options);
         this.readerFactoryBuilder =
                 KeyValueFileReaderFactory.builder(
                         schemaManager,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
similarity index 83%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
index 06a217d7..84c08788 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.table.sink;
+package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.memory.MemoryPoolFactory;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
 
@@ -31,16 +31,17 @@ import java.util.Iterator;
 import java.util.Map;
 
 /**
- * A {@link TableWrite} which supports using shared memory and preempting 
memory from other writers.
+ * Base {@link FileStoreWrite} implementation which supports using shared 
memory and preempting
+ * memory from other writers.
+ *
+ * @param <T> type of record to write.
  */
-public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> {
-
+public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T> {
     private final MemoryPoolFactory memoryPoolFactory;
 
-    protected MemoryTableWrite(
-            FileStoreWrite<T> write, SinkRecordConverter recordConverter, 
CoreOptions options) {
-        super(write, recordConverter);
-
+    public MemoryFileStoreWrite(
+            SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions 
options) {
+        super(snapshotManager, scan);
         HeapMemorySegmentPool memoryPool =
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize());
         this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, 
this::memoryOwners);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 5cc90e27..a8a85755 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -30,11 +30,11 @@ import 
org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.utils.RecordWriter;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.sink.WriteRecordConverter;
 import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -108,21 +108,26 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
-        return new AbstractTableWrite<RowData>(store.newWrite(), 
recordConverter) {
-            @Override
-            protected void writeSinkRecord(SinkRecord record, 
RecordWriter<RowData> writer)
-                    throws Exception {
-                Preconditions.checkState(
-                        record.row().getRowKind() == RowKind.INSERT,
-                        "Append only writer can not accept row with RowKind 
%s",
-                        record.row().getRowKind());
-                writer.write(record.row());
-            }
-        };
+        return new TableWriteImpl<>(
+                store.newWrite(), recordConverter, new 
AppendOnlyWriteRecordConverter());
     }
 
     @Override
     public AppendOnlyFileStore store() {
         return store;
     }
+
+    /** {@link WriteRecordConverter} implementation in {@link 
AppendOnlyFileStore}. */
+    private static class AppendOnlyWriteRecordConverter implements 
WriteRecordConverter<RowData> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public RowData write(SinkRecord record) throws Exception {
+            Preconditions.checkState(
+                    record.row().getRowKind() == RowKind.INSERT,
+                    "Append only writer can not accept row with RowKind %s",
+                    record.row().getRowKind());
+            return record.row();
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index e2a4e8c9..1604b8e7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -33,11 +33,11 @@ import 
org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.utils.RecordWriter;
-import org.apache.flink.table.store.table.sink.MemoryTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.sink.WriteRecordConverter;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
 import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -122,33 +122,44 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
-        return new MemoryTableWrite<KeyValue>(store.newWrite(), 
recordConverter, store.options()) {
-
-            private final KeyValue kv = new KeyValue();
-
-            @Override
-            protected void writeSinkRecord(SinkRecord record, 
RecordWriter<KeyValue> writer)
-                    throws Exception {
-                switch (record.row().getRowKind()) {
-                    case INSERT:
-                    case UPDATE_AFTER:
-                        kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(1L));
-                        break;
-                    case UPDATE_BEFORE:
-                    case DELETE:
-                        kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(-1L));
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown row kind " + 
record.row().getRowKind());
-                }
-                writer.write(kv);
-            }
-        };
+        return new TableWriteImpl<>(
+                store.newWrite(), recordConverter, new 
ChangelogValueCountWriteRecordConverter());
     }
 
     @Override
     public KeyValueFileStore store() {
         return store;
     }
+
+    /**
+     * {@link WriteRecordConverter} implementation for {@link 
ChangelogValueCountFileStoreTable}.
+     */
+    private static class ChangelogValueCountWriteRecordConverter
+            implements WriteRecordConverter<KeyValue> {
+        private static final long serialVersionUID = 1L;
+
+        private transient KeyValue kv;
+
+        @Override
+        public KeyValue write(SinkRecord record) throws Exception {
+            if (kv == null) {
+                kv = new KeyValue();
+            }
+
+            switch (record.row().getRowKind()) {
+                case INSERT:
+                case UPDATE_AFTER:
+                    kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(1L));
+                    break;
+                case UPDATE_BEFORE:
+                case DELETE:
+                    kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(-1L));
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown row kind " + record.row().getRowKind());
+            }
+            return kv;
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index ed464435..6785af4e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -35,12 +35,12 @@ import 
org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.utils.RecordWriter;
-import org.apache.flink.table.store.table.sink.MemoryTableWrite;
 import org.apache.flink.table.store.table.sink.SequenceGenerator;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.sink.TableWriteImpl;
+import org.apache.flink.table.store.table.sink.WriteRecordConverter;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
 import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -102,11 +102,12 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                 throw new UnsupportedOperationException("Unsupported merge 
engine: " + mergeEngine);
         }
 
+        CoreOptions options = new CoreOptions(conf);
         this.store =
                 new KeyValueFileStore(
                         schemaManager,
                         tableSchema.id(),
-                        new CoreOptions(conf),
+                        options,
                         tableSchema.logicalPartitionType(),
                         addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
                         
addKeyNamePrefix(tableSchema.logicalTrimmedPrimaryKeysType()),
@@ -190,34 +191,48 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
-        SequenceGenerator sequenceGenerator =
-                store.options()
-                        .sequenceField()
-                        .map(field -> new SequenceGenerator(field, 
schema().logicalRowType()))
-                        .orElse(null);
-        return new MemoryTableWrite<KeyValue>(store.newWrite(), 
recordConverter, store.options()) {
-
-            private final KeyValue kv = new KeyValue();
-
-            @Override
-            protected void writeSinkRecord(SinkRecord record, 
RecordWriter<KeyValue> writer)
-                    throws Exception {
-                long sequenceNumber =
-                        sequenceGenerator == null
-                                ? KeyValue.UNKNOWN_SEQUENCE
-                                : sequenceGenerator.generate(record.row());
-                writer.write(
-                        kv.replace(
-                                record.primaryKey(),
-                                sequenceNumber,
-                                record.row().getRowKind(),
-                                record.row()));
-            }
-        };
+        return new TableWriteImpl<>(
+                store.newWrite(),
+                recordConverter,
+                new ChangelogWithKeyWriteRecordConverter(store.options(), 
schema()));
     }
 
     @Override
     public KeyValueFileStore store() {
         return store;
     }
+
+    /** {@link WriteRecordConverter} implementation for {@link 
ChangelogWithKeyFileStoreTable}. */
+    private static class ChangelogWithKeyWriteRecordConverter
+            implements WriteRecordConverter<KeyValue> {
+        private final CoreOptions options;
+        private final TableSchema schema;
+        private transient SequenceGenerator sequenceGenerator;
+        private transient KeyValue kv;
+
+        private ChangelogWithKeyWriteRecordConverter(CoreOptions options, 
TableSchema schema) {
+            this.options = options;
+            this.schema = schema;
+        }
+
+        @Override
+        public KeyValue write(SinkRecord record) throws Exception {
+            if (sequenceGenerator == null) {
+                sequenceGenerator =
+                        options.sequenceField()
+                                .map(field -> new SequenceGenerator(field, 
schema.logicalRowType()))
+                                .orElse(null);
+            }
+            if (kv == null) {
+                kv = new KeyValue();
+            }
+
+            long sequenceNumber =
+                    sequenceGenerator == null
+                            ? KeyValue.UNKNOWN_SEQUENCE
+                            : sequenceGenerator.generate(record.row());
+            return kv.replace(
+                    record.primaryKey(), sequenceNumber, 
record.row().getRowKind(), record.row());
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
deleted file mode 100644
index 0371ae76..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.sink;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.utils.RecordWriter;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Base {@link TableWrite} implementation.
- *
- * @param <T> type of record to write into {@link 
org.apache.flink.table.store.file.FileStore}.
- */
-public abstract class AbstractTableWrite<T> implements TableWrite {
-
-    private final FileStoreWrite<T> write;
-    private final SinkRecordConverter recordConverter;
-
-    protected final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
-    private final ExecutorService compactExecutor;
-
-    private boolean overwrite = false;
-
-    protected AbstractTableWrite(FileStoreWrite<T> write, SinkRecordConverter 
recordConverter) {
-        this.write = write;
-        this.recordConverter = recordConverter;
-
-        this.writers = new HashMap<>();
-        this.compactExecutor =
-                Executors.newSingleThreadScheduledExecutor(
-                        new ExecutorThreadFactory("compaction-thread"));
-    }
-
-    @Override
-    public TableWrite withOverwrite(boolean overwrite) {
-        this.overwrite = overwrite;
-        return this;
-    }
-
-    @Override
-    public TableWrite withIOManager(IOManager ioManager) {
-        this.write.withIOManager(ioManager);
-        return this;
-    }
-
-    @Override
-    public SinkRecordConverter recordConverter() {
-        return recordConverter;
-    }
-
-    @Override
-    public SinkRecord write(RowData rowData) throws Exception {
-        SinkRecord record = recordConverter.convert(rowData);
-        RecordWriter<T> writer = getWriter(record.partition(), 
record.bucket());
-        writeSinkRecord(record, writer);
-        return record;
-    }
-
-    @Override
-    public List<FileCommittable> prepareCommit(boolean endOfInput) throws 
Exception {
-        List<FileCommittable> result = new ArrayList<>();
-
-        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> 
partIter =
-                writers.entrySet().iterator();
-        while (partIter.hasNext()) {
-            Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>> partEntry 
= partIter.next();
-            BinaryRowData partition = partEntry.getKey();
-            Iterator<Map.Entry<Integer, RecordWriter<T>>> bucketIter =
-                    partEntry.getValue().entrySet().iterator();
-            while (bucketIter.hasNext()) {
-                Map.Entry<Integer, RecordWriter<T>> entry = bucketIter.next();
-                int bucket = entry.getKey();
-                RecordWriter<T> writer = entry.getValue();
-                RecordWriter.CommitIncrement increment = 
writer.prepareCommit(endOfInput);
-                FileCommittable committable =
-                        new FileCommittable(
-                                partition,
-                                bucket,
-                                increment.newFilesIncrement(),
-                                increment.compactIncrement());
-                result.add(committable);
-
-                // clear if no update
-                // we need a mechanism to clear writers, otherwise there will 
be more and more
-                // such as yesterday's partition that no longer needs to be 
written.
-                if (committable.isEmpty()) {
-                    writer.close();
-                    bucketIter.remove();
-                }
-            }
-
-            if (partEntry.getValue().isEmpty()) {
-                partIter.remove();
-            }
-        }
-
-        return result;
-    }
-
-    @Override
-    public void close() throws Exception {
-        for (Map<Integer, RecordWriter<T>> bucketWriters : writers.values()) {
-            for (RecordWriter<T> writer : bucketWriters.values()) {
-                writer.close();
-            }
-        }
-        writers.clear();
-        compactExecutor.shutdownNow();
-    }
-
-    @VisibleForTesting
-    public Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers() {
-        return writers;
-    }
-
-    protected abstract void writeSinkRecord(SinkRecord record, RecordWriter<T> 
writer)
-            throws Exception;
-
-    private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
-        Map<Integer, RecordWriter<T>> buckets = writers.get(partition);
-        if (buckets == null) {
-            buckets = new HashMap<>();
-            writers.put(partition.copy(), buckets);
-        }
-        return buckets.computeIfAbsent(bucket, k -> 
createWriter(partition.copy(), bucket));
-    }
-
-    private RecordWriter<T> createWriter(BinaryRowData partition, int bucket) {
-        RecordWriter<T> writer =
-                overwrite
-                        ? write.createEmptyWriter(partition.copy(), bucket, 
compactExecutor)
-                        : write.createWriter(partition.copy(), bucket, 
compactExecutor);
-        notifyNewWriter(writer);
-        return writer;
-    }
-
-    protected void notifyNewWriter(RecordWriter<T> writer) {}
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
new file mode 100644
index 00000000..389d2514
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+
+import java.util.List;
+
+/**
+ * {@link TableWrite} implementation.
+ *
+ * @param <T> type of record to write into {@link 
org.apache.flink.table.store.file.FileStore}.
+ */
+public class TableWriteImpl<T> implements TableWrite {
+
+    private final FileStoreWrite<T> write;
+    private final SinkRecordConverter recordConverter;
+    private final WriteRecordConverter<T> writeRecordConverter;
+
+    public TableWriteImpl(
+            FileStoreWrite<T> write,
+            SinkRecordConverter recordConverter,
+            WriteRecordConverter<T> writeRecordConverter) {
+        this.write = write;
+        this.recordConverter = recordConverter;
+        this.writeRecordConverter = writeRecordConverter;
+    }
+
+    @Override
+    public TableWrite withOverwrite(boolean overwrite) {
+        write.withOverwrite(overwrite);
+        return this;
+    }
+
+    @Override
+    public TableWrite withIOManager(IOManager ioManager) {
+        write.withIOManager(ioManager);
+        return this;
+    }
+
+    @Override
+    public SinkRecordConverter recordConverter() {
+        return recordConverter;
+    }
+
+    @Override
+    public SinkRecord write(RowData rowData) throws Exception {
+        SinkRecord record = recordConverter.convert(rowData);
+        write.write(record.partition(), record.bucket(), 
writeRecordConverter.write(record));
+        return record;
+    }
+
+    @Override
+    public List<FileCommittable> prepareCommit(boolean endOfInput) throws 
Exception {
+        return write.prepareCommit(endOfInput);
+    }
+
+    @Override
+    public void close() throws Exception {
+        write.close();
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java
new file mode 100644
index 00000000..134e0322
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
+import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
+import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
+
+import java.io.Serializable;
+
+/**
+ * Validate and convert the {@link SinkRecord} to the record supported in 
different store tables.
+ *
+ * @param <T> type of record in store table.
+ */
+public interface WriteRecordConverter<T> extends Serializable {
+    /**
+     * Validate and convert the {@link SinkRecord} to the record, operations 
in {@link
+     * AppendOnlyFileStoreTable}, {@link ChangelogValueCountFileStoreTable} 
and {@link
+     * ChangelogWithKeyFileStoreTable} are different.
+     *
+     * @param record the record to write
+     * @return The data in different store tables
+     * @throws Exception the thrown exception
+     */
+    T write(SinkRecord record) throws Exception;
+}


Reply via email to