This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fedb65  [FLINK-27517] Introduce rolling file writer to write one 
record each time for append-only table
1fedb65 is described below

commit 1fedb6586ba292bedf00bb1b6922d5110a2f7fa8
Author: openinx <[email protected]>
AuthorDate: Sun May 8 20:02:20 2022 +0800

    [FLINK-27517] Introduce rolling file writer to write one record each time 
for append-only table
    
    This closes #110
---
 .../store/connector/sink/StoreSinkWriter.java      |   2 +-
 .../table/store/connector/sink/StoreSinkTest.java  |   2 +-
 .../table/store/connector/sink/TestFileStore.java  |   2 +-
 .../source/FileStoreSourceSplitReaderTest.java     |   2 +-
 .../store/connector/source/TestDataReadWrite.java  |   2 +-
 .../table/store/file/data/DataFileWriter.java      | 218 +++++++++------------
 .../table/store/file/manifest/ManifestFile.java    | 110 ++++++-----
 .../store/file/mergetree/MergeTreeWriter.java      |   2 +-
 .../table/store/file/operation/FileStoreWrite.java |   2 +-
 .../store/file/operation/FileStoreWriteImpl.java   |   2 +-
 .../flink/table/store/file/utils/RollingFile.java  | 125 ------------
 .../table/store/file/writer/BaseBulkWriter.java    |  59 ++++++
 .../table/store/file/writer/BaseFileWriter.java    | 121 ++++++++++++
 .../flink/table/store/file/writer/FileWriter.java  |  86 ++++++++
 .../store/file/{utils => writer}/RecordWriter.java |   2 +-
 .../table/store/file/writer/RollingFileWriter.java | 132 +++++++++++++
 .../flink/table/store/file/TestFileStore.java      |   2 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |   2 +-
 18 files changed, 572 insertions(+), 301 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 2995b74..4a114b4 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.log.LogWriteCallback;
 import org.apache.flink.table.store.sink.SinkRecord;
 import org.apache.flink.table.store.sink.SinkRecordConverter;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index de48269..303e471 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import 
org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 3c98e3b..639ef35 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.ArrayList;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index acaac22..7002666 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.AfterAll;
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
index abc1922..b6997e8 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
 import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
index a57e6df..b46db7f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
@@ -33,7 +33,9 @@ import 
org.apache.flink.table.store.file.stats.FieldStatsCollector;
 import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.RollingFile;
+import org.apache.flink.table.store.file.writer.BaseBulkWriter;
+import org.apache.flink.table.store.file.writer.BaseFileWriter;
+import org.apache.flink.table.store.file.writer.RollingFileWriter;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.CloseableIterator;
 
@@ -41,9 +43,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Supplier;
 
 /** Writes {@link KeyValue}s into data files. */
 public class DataFileWriter {
@@ -91,176 +94,149 @@ public class DataFileWriter {
     }
 
     /**
-     * Write several {@link KeyValue}s into an data file of a given level.
+     * Write several {@link KeyValue}s into a data file of a given level.
      *
      * <p>NOTE: This method is atomic.
      */
     public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int 
level)
             throws Exception {
-        DataRollingFile rollingFile =
-                fileStatsExtractor == null
-                        ? new StatsCollectingRollingFile(level)
-                        : new FileExtractingRollingFile(level);
-        List<DataFileMeta> result = new ArrayList<>();
-        List<Path> filesToCleanUp = new ArrayList<>();
-        try {
-            rollingFile.write(iterator, result, filesToCleanUp);
+
+        RollingKvWriter rollingKvWriter = createRollingKvWriter(level, 
suggestedFileSize);
+        try (RollingKvWriter writer = rollingKvWriter) {
+            writer.write(iterator);
+
         } catch (Throwable e) {
             LOG.warn("Exception occurs when writing data files. Cleaning up.", 
e);
-            for (Path path : filesToCleanUp) {
-                FileUtils.deleteOrWarn(path);
-            }
+
+            rollingKvWriter.abort();
             throw e;
         } finally {
             iterator.close();
         }
-        return result;
+
+        return rollingKvWriter.result();
     }
 
     public void delete(DataFileMeta file) {
         FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
     }
 
-    private abstract class DataRollingFile extends RollingFile<KeyValue, 
DataFileMeta> {
+    private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
+
+        @Override
+        public BulkWriter<KeyValue> create(FSDataOutputStream out) throws 
IOException {
+            KeyValueSerializer serializer = new KeyValueSerializer(keyType, 
valueType);
+
+            return new BaseBulkWriter<>(writerFactory.create(out), 
serializer::toRow);
+        }
+    }
 
+    private class KvFileWriter extends BaseFileWriter<KeyValue, DataFileMeta> {
         private final int level;
-        private final KeyValueSerializer serializer;
         private final RowDataSerializer keySerializer;
 
-        private long rowCount;
-        private BinaryRowData minKey;
-        private RowData maxKey;
-        private long minSequenceNumber;
-        private long maxSequenceNumber;
-
-        private DataRollingFile(int level) {
-            // each level 0 data file is a sorted run,
-            // we must not write rolling files for level 0 data files
-            // otherwise we cannot reduce the number of sorted runs when 
compacting
-            super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
+        private FieldStatsCollector keyStatsCollector = null;
+        private FieldStatsCollector valueStatsCollector = null;
+
+        private BinaryRowData minKey = null;
+        private RowData maxKey = null;
+        private long minSeqNumber = Long.MAX_VALUE;
+        private long maxSeqNumber = Long.MIN_VALUE;
+
+        public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path 
path, int level)
+                throws IOException {
+            super(writerFactory, path);
+
             this.level = level;
-            this.serializer = new KeyValueSerializer(keyType, valueType);
             this.keySerializer = new RowDataSerializer(keyType);
-            resetMeta();
-        }
-
-        @Override
-        protected Path newPath() {
-            return pathFactory.newPath();
+            if (fileStatsExtractor == null) {
+                this.keyStatsCollector = new FieldStatsCollector(keyType);
+                this.valueStatsCollector = new FieldStatsCollector(valueType);
+            }
         }
 
         @Override
-        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws 
IOException {
-            return writerFactory.create(out);
-        }
+        public void write(KeyValue kv) throws IOException {
+            super.write(kv);
 
-        @Override
-        protected RowData toRowData(KeyValue kv) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing key-value to data file, kv: " + 
kv.toString(keyType, valueType));
+            if (fileStatsExtractor == null) {
+                keyStatsCollector.collect(kv.key());
+                valueStatsCollector.collect(kv.value());
             }
 
-            rowCount++;
+            updateMinKey(kv);
+            updateMaxKey(kv);
+
+            updateMinSeqNumber(kv);
+            updateMaxSeqNumber(kv);
+        }
+
+        private void updateMinKey(KeyValue kv) {
             if (minKey == null) {
                 minKey = keySerializer.toBinaryRow(kv.key()).copy();
             }
-            maxKey = kv.key();
-            minSequenceNumber = Math.min(minSequenceNumber, 
kv.sequenceNumber());
-            maxSequenceNumber = Math.max(maxSequenceNumber, 
kv.sequenceNumber());
-
-            return serializer.toRow(kv);
         }
 
-        @Override
-        protected DataFileMeta collectFile(Path path) throws IOException {
-            KeyAndValueStats stats = extractStats(path);
-            DataFileMeta result =
-                    new DataFileMeta(
-                            path.getName(),
-                            FileUtils.getFileSize(path),
-                            rowCount,
-                            minKey,
-                            keySerializer.toBinaryRow(maxKey).copy(),
-                            stats.keyStats,
-                            stats.valueStats,
-                            minSequenceNumber,
-                            maxSequenceNumber,
-                            level);
-            resetMeta();
-            return result;
+        private void updateMaxKey(KeyValue kv) {
+            maxKey = kv.key();
         }
 
-        protected void resetMeta() {
-            rowCount = 0;
-            minKey = null;
-            maxKey = null;
-            minSequenceNumber = Long.MAX_VALUE;
-            maxSequenceNumber = Long.MIN_VALUE;
+        private void updateMinSeqNumber(KeyValue kv) {
+            minSeqNumber = Math.min(minSeqNumber, kv.sequenceNumber());
         }
 
-        protected abstract KeyAndValueStats extractStats(Path path);
-    }
-
-    private class FileExtractingRollingFile extends DataRollingFile {
-
-        private FileExtractingRollingFile(int level) {
-            super(level);
+        private void updateMaxSeqNumber(KeyValue kv) {
+            maxSeqNumber = Math.max(maxSeqNumber, kv.sequenceNumber());
         }
 
         @Override
-        protected KeyAndValueStats extractStats(Path path) {
-            FieldStats[] rawStats;
-            try {
-                rawStats = fileStatsExtractor.extract(path);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+        protected DataFileMeta createFileMeta(Path path) throws IOException {
+
+            FieldStats[] keyStats;
+            FieldStats[] valueStats;
+            if (fileStatsExtractor == null) {
+                keyStats = keyStatsCollector.extract();
+                valueStats = valueStatsCollector.extract();
+            } else {
+                FieldStats[] rowStats = fileStatsExtractor.extract(path);
+                int numKeyFields = keyType.getFieldCount();
+                keyStats = Arrays.copyOfRange(rowStats, 0, numKeyFields);
+                valueStats = Arrays.copyOfRange(rowStats, numKeyFields + 2, 
rowStats.length);
             }
 
-            int numKeyFields = keyType.getFieldCount();
-            return new KeyAndValueStats(
-                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
-                    Arrays.copyOfRange(rawStats, numKeyFields + 2, 
rawStats.length));
+            return new DataFileMeta(
+                    path.getName(),
+                    FileUtils.getFileSize(path),
+                    recordCount(),
+                    minKey,
+                    keySerializer.toBinaryRow(maxKey).copy(),
+                    keyStats,
+                    valueStats,
+                    minSeqNumber,
+                    maxSeqNumber,
+                    level);
         }
     }
 
-    private class StatsCollectingRollingFile extends DataRollingFile {
-
-        private FieldStatsCollector keyStatsCollector;
-        private FieldStatsCollector valueStatsCollector;
-
-        private StatsCollectingRollingFile(int level) {
-            super(level);
-        }
+    private static class RollingKvWriter extends RollingFileWriter<KeyValue, 
DataFileMeta> {
 
-        @Override
-        protected RowData toRowData(KeyValue kv) {
-            keyStatsCollector.collect(kv.key());
-            valueStatsCollector.collect(kv.value());
-            return super.toRowData(kv);
-        }
-
-        @Override
-        protected KeyAndValueStats extractStats(Path path) {
-            return new KeyAndValueStats(keyStatsCollector.extract(), 
valueStatsCollector.extract());
-        }
-
-        @Override
-        protected void resetMeta() {
-            super.resetMeta();
-            keyStatsCollector = new FieldStatsCollector(keyType);
-            valueStatsCollector = new FieldStatsCollector(valueType);
+        public RollingKvWriter(Supplier<KvFileWriter> writerFactory, long 
targetFileSize) {
+            super(writerFactory, targetFileSize);
         }
     }
 
-    private static class KeyAndValueStats {
-
-        private final FieldStats[] keyStats;
-        private final FieldStats[] valueStats;
+    private Supplier<KvFileWriter> createWriterFactory(int level) {
+        return () -> {
+            try {
+                return new KvFileWriter(new KvBulkWriterFactory(), 
pathFactory.newPath(), level);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        };
+    }
 
-        private KeyAndValueStats(FieldStats[] keyStats, FieldStats[] 
valueStats) {
-            this.keyStats = keyStats;
-            this.valueStats = valueStats;
-        }
+    private RollingKvWriter createRollingKvWriter(int level, long 
targetFileSize) {
+        return new RollingKvWriter(createWriterFactory(level), targetFileSize);
     }
 
     /** Creates {@link DataFileWriter}. */
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 55d60ea..48a5372 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -29,16 +29,19 @@ import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.stats.FieldStatsCollector;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.RollingFile;
 import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
+import org.apache.flink.table.store.file.writer.BaseBulkWriter;
+import org.apache.flink.table.store.file.writer.BaseFileWriter;
+import org.apache.flink.table.store.file.writer.RollingFileWriter;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.io.UncheckedIOException;
 import java.util.List;
+import java.util.function.Supplier;
 
 /**
  * This file includes several {@link ManifestEntry}s, representing the 
additional changes since last
@@ -90,48 +93,51 @@ public class ManifestFile {
      * <p>NOTE: This method is atomic.
      */
     public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
-        ManifestRollingFile rollingFile = new ManifestRollingFile();
-        List<ManifestFileMeta> result = new ArrayList<>();
-        List<Path> filesToCleanUp = new ArrayList<>();
-        try {
-            rollingFile.write(entries.iterator(), result, filesToCleanUp);
-        } catch (Throwable e) {
+
+        ManifestRollingWriter rollingWriter = 
createManifestRollingWriter(suggestedFileSize);
+        try (ManifestRollingWriter writer = rollingWriter) {
+            writer.write(entries);
+
+        } catch (Exception e) {
             LOG.warn("Exception occurs when writing manifest files. Cleaning 
up.", e);
-            for (Path path : filesToCleanUp) {
-                FileUtils.deleteOrWarn(path);
-            }
+
+            rollingWriter.abort();
             throw new RuntimeException(e);
         }
-        return result;
+
+        return rollingWriter.result();
     }
 
     public void delete(String fileName) {
         FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
     }
 
-    private class ManifestRollingFile extends RollingFile<ManifestEntry, 
ManifestFileMeta> {
-
-        private long numAddedFiles;
-        private long numDeletedFiles;
-        private FieldStatsCollector statsCollector;
-
-        private ManifestRollingFile() {
-            super(suggestedFileSize);
-            resetMeta();
-        }
+    private class ManifestEntryBulkWriterFactory implements 
BulkWriter.Factory<ManifestEntry> {
 
         @Override
-        protected Path newPath() {
-            return pathFactory.newManifestFile();
+        public BulkWriter<ManifestEntry> create(FSDataOutputStream out) throws 
IOException {
+            return new BaseBulkWriter<>(writerFactory.create(out), 
serializer::toRow);
         }
+    }
 
-        @Override
-        protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws 
IOException {
-            return writerFactory.create(out);
+    private class ManifestEntryWriter extends BaseFileWriter<ManifestEntry, 
ManifestFileMeta> {
+
+        private final FieldStatsCollector statsCollector;
+
+        private long numAddedFiles = 0;
+        private long numDeletedFiles = 0;
+
+        ManifestEntryWriter(BulkWriter.Factory<ManifestEntry> writerFactory, 
Path path)
+                throws IOException {
+            super(writerFactory, path);
+
+            this.statsCollector = new FieldStatsCollector(partitionType);
         }
 
         @Override
-        protected RowData toRowData(ManifestEntry entry) {
+        public void write(ManifestEntry entry) throws IOException {
+            super.write(entry);
+
             switch (entry.kind()) {
                 case ADD:
                     numAddedFiles++;
@@ -139,32 +145,48 @@ public class ManifestFile {
                 case DELETE:
                     numDeletedFiles++;
                     break;
+                default:
+                    throw new UnsupportedOperationException("Unknown entry 
kind: " + entry.kind());
             }
-            statsCollector.collect(entry.partition());
 
-            return serializer.toRow(entry);
+            statsCollector.collect(entry.partition());
         }
 
         @Override
-        protected ManifestFileMeta collectFile(Path path) throws IOException {
-            ManifestFileMeta result =
-                    new ManifestFileMeta(
-                            path.getName(),
-                            path.getFileSystem().getFileStatus(path).getLen(),
-                            numAddedFiles,
-                            numDeletedFiles,
-                            statsCollector.extract());
-            resetMeta();
-            return result;
+        protected ManifestFileMeta createFileMeta(Path path) throws 
IOException {
+            return new ManifestFileMeta(
+                    path.getName(),
+                    path.getFileSystem().getFileStatus(path).getLen(),
+                    numAddedFiles,
+                    numDeletedFiles,
+                    statsCollector.extract());
         }
+    }
 
-        private void resetMeta() {
-            numAddedFiles = 0;
-            numDeletedFiles = 0;
-            statsCollector = new FieldStatsCollector(partitionType);
+    private static class ManifestRollingWriter
+            extends RollingFileWriter<ManifestEntry, ManifestFileMeta> {
+
+        public ManifestRollingWriter(
+                Supplier<ManifestEntryWriter> writerFactory, long 
targetFileSize) {
+            super(writerFactory, targetFileSize);
         }
     }
 
+    private Supplier<ManifestEntryWriter> createWriterFactory() {
+        return () -> {
+            try {
+                return new ManifestEntryWriter(
+                        new ManifestEntryBulkWriterFactory(), 
pathFactory.newManifestFile());
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        };
+    }
+
+    private ManifestRollingWriter createManifestRollingWriter(long 
targetFileSize) {
+        return new ManifestRollingWriter(createWriterFactory(), 
targetFileSize);
+    }
+
     /**
      * Creator of {@link ManifestFile}. It reueses {@link BulkFormat} and 
{@link BulkWriter.Factory}
      * from {@link FileFormat}.
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 5f9e3bf..0c62407 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.util.CloseableIterator;
 
 import java.util.ArrayList;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 0e6b49f..253bf8f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 
 import java.util.concurrent.ExecutorService;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index b6a9409..bded00f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -36,7 +36,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Collections;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
deleted file mode 100644
index 89ebaf0..0000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.utils;
-
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.RowData;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A utility class to write a list of objects into several files, each with a 
size limit.
- *
- * @param <R> record type
- * @param <F> file meta type
- */
-public abstract class RollingFile<R, F> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(RollingFile.class);
-
-    private final long suggestedFileSize;
-
-    public RollingFile(long suggestedFileSize) {
-        this.suggestedFileSize = suggestedFileSize;
-    }
-
-    /** Create the path for a new file. */
-    protected abstract Path newPath();
-
-    /** Create a new object writer. Called per file. */
-    protected abstract BulkWriter<RowData> newWriter(FSDataOutputStream out) 
throws IOException;
-
-    /**
-     * Called before writing a record into file. Per-record calculation can be 
performed here.
-     *
-     * @param record record to write
-     * @return serialized record
-     */
-    protected abstract RowData toRowData(R record);
-
-    /** Called before closing the current file. Per-file calculation can be 
performed here. */
-    protected abstract F collectFile(Path path) throws IOException;
-
-    public void write(Iterator<R> iterator, List<F> result, List<Path> 
filesToCleanUp)
-            throws IOException {
-        Writer writer = null;
-        Path currentPath = null;
-
-        while (iterator.hasNext()) {
-            if (writer == null) {
-                // create new rolling file
-                currentPath = newPath();
-                filesToCleanUp.add(currentPath);
-                writer = new Writer(currentPath);
-            }
-
-            RowData serialized = toRowData(iterator.next());
-            writer.write(serialized);
-
-            if (writer.exceedsSuggestedFileSize()) {
-                // exceeds suggested size, close current file
-                writer.finish();
-                result.add(collectFile(currentPath));
-                writer = null;
-            }
-        }
-
-        // finish last file
-        if (writer != null) {
-            writer.finish();
-            result.add(collectFile(currentPath));
-        }
-    }
-
-    private class Writer {
-        private final FSDataOutputStream out;
-        private final BulkWriter<RowData> writer;
-
-        private Writer(Path path) throws IOException {
-            this.out = path.getFileSystem().create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
-            this.writer = newWriter(out);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Create new rolling file " + path);
-            }
-        }
-
-        private void write(RowData record) throws IOException {
-            writer.addElement(record);
-        }
-
-        private boolean exceedsSuggestedFileSize() throws IOException {
-            // NOTE: this method is inaccurate for formats buffering changes 
in memory
-            return out.getPos() >= suggestedFileSize;
-        }
-
-        private void finish() throws IOException {
-            writer.finish();
-            out.close();
-        }
-    }
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseBulkWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseBulkWriter.java
new file mode 100644
index 0000000..aad9d70
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseBulkWriter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Base bulk writer to delegate the generic data type writing into the {@link 
RowData} writer.
+ *
+ * @param <T> generic record type.
+ */
+public class BaseBulkWriter<T> implements BulkWriter<T> {
+
+    private final BulkWriter<RowData> writer;
+
+    // Convert the record from the generic type T to the concrete RowData type.
+    private final Function<T, RowData> converter;
+
+    public BaseBulkWriter(BulkWriter<RowData> writer, Function<T, RowData> 
converter) {
+        this.writer = writer;
+        this.converter = converter;
+    }
+
+    @Override
+    public void addElement(T elem) throws IOException {
+        writer.addElement(converter.apply(elem));
+    }
+
+    @Override
+    public void flush() throws IOException {
+        writer.flush();
+    }
+
+    @Override
+    public void finish() throws IOException {
+        writer.finish();
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
new file mode 100644
index 0000000..03f0790
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The abstracted base file writer implementation for {@link FileWriter}.
+ *
+ * @param <T> record data type.
+ * @param <R> file meta data type.
+ */
+public abstract class BaseFileWriter<T, R> implements FileWriter<T, R> {
+
+    private final BulkWriter.Factory<T> writerFactory;
+    private final Path path;
+
+    private long recordCount;
+    private FSDataOutputStream currentOut = null;
+    private BulkWriter<T> currentWriter = null;
+
+    private boolean closed = false;
+
+    public BaseFileWriter(BulkWriter.Factory<T> writerFactory, Path path) {
+        this.writerFactory = writerFactory;
+        this.path = path;
+
+        this.recordCount = 0;
+    }
+
+    public Path path() {
+        return path;
+    }
+
+    private void openCurrentWriter() throws IOException {
+        this.currentOut = path.getFileSystem().create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
+        this.currentWriter = writerFactory.create(currentOut);
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        if (currentWriter == null) {
+            openCurrentWriter();
+        }
+
+        currentWriter.addElement(row);
+        recordCount += 1;
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        if (currentOut != null) {
+            return currentOut.getPos();
+        }
+        return 0;
+    }
+
+    protected abstract R createFileMeta(Path path) throws IOException;
+
+    @Override
+    public void abort() {
+        IOUtils.closeQuietly(this);
+
+        // Abort to clean the orphan file.
+        FileUtils.deleteOrWarn(path);
+    }
+
+    @Override
+    public R result() throws IOException {
+        Preconditions.checkState(closed, "Cannot access the file meta unless 
close this writer.");
+
+        return createFileMeta(path);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+            if (currentWriter != null) {
+                currentWriter.finish();
+                currentWriter = null;
+            }
+
+            if (currentOut != null) {
+                currentOut.close();
+                currentOut = null;
+            }
+
+            closed = true;
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
new file mode 100644
index 0000000..856f99b
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * File writer to accept one record or a branch of records and generate 
metadata after closing it.
+ *
+ * @param <T> record type.
+ * @param <R> file result to collect.
+ */
+public interface FileWriter<T, R> extends Closeable {
+
+    /**
+     * Add only one record to this file writer.
+     *
+     * @param record to write.
+     * @throws IOException if encounter any IO error.
+     */
+    void write(T record) throws IOException;
+
+    /**
+     * Add records from {@link Iterator} to this file writer.
+     *
+     * @param records to write
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterator<T> records) throws IOException {
+        while (records.hasNext()) {
+            write(records.next());
+        }
+    }
+
+    /**
+     * Add records from {@link Iterable} to file writer.
+     *
+     * @param records to write.
+     * @throws IOException if encounter any IO error.
+     */
+    default void write(Iterable<T> records) throws IOException {
+        for (T record : records) {
+            write(record);
+        }
+    }
+
+    /**
+     * The total written record count.
+     *
+     * @return record count.
+     */
+    long recordCount();
+
+    /**
+     * The estimated length of the current writer.
+     *
+     * @return the estimated length.
+     * @throws IOException if encounter any IO error.
+     */
+    long length() throws IOException;
+
+    /** Abort to clear orphan file(s) if encounter any error. */
+    void abort();
+
+    /** @return the result for this closed file writer. */
+    R result() throws IOException;
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
similarity index 97%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
index 4d28832..4f29e27 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.file.writer;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.ValueKind;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
new file mode 100644
index 0000000..298d501
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target 
file size.
+ *
+ * @param <T> record data type.
+ * @param <R> the file metadata result.
+ */
+public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
+
+    private final Supplier<? extends FileWriter<T, R>> writerFactory;
+    private final long targetFileSize;
+    private final List<FileWriter<T, R>> openedWriters;
+    private final List<R> results;
+
+    private FileWriter<T, R> currentWriter = null;
+    private long lengthOfClosedFiles = 0L;
+    private long recordCount = 0;
+    private boolean closed = false;
+
+    public RollingFileWriter(
+            Supplier<? extends FileWriter<T, R>> writerFactory, long 
targetFileSize) {
+        this.writerFactory = writerFactory;
+        this.targetFileSize = targetFileSize;
+        this.openedWriters = new ArrayList<>();
+        this.results = new ArrayList<>();
+    }
+
+    @Override
+    public void write(T row) throws IOException {
+        // Open the current writer if write the first record or roll over 
happen before.
+        if (currentWriter == null) {
+            openCurrentWriter();
+        }
+
+        currentWriter.write(row);
+        recordCount += 1;
+
+        if (currentWriter.length() >= targetFileSize) {
+            closeCurrentWriter();
+        }
+    }
+
+    private void openCurrentWriter() {
+        currentWriter = writerFactory.get();
+        openedWriters.add(currentWriter);
+    }
+
+    private void closeCurrentWriter() {
+        if (currentWriter != null) {
+            try {
+                lengthOfClosedFiles += currentWriter.length();
+
+                currentWriter.close();
+                results.add(currentWriter.result());
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+
+            currentWriter = null;
+        }
+    }
+
+    @Override
+    public long recordCount() {
+        return recordCount;
+    }
+
+    @Override
+    public long length() throws IOException {
+        long length = lengthOfClosedFiles;
+        if (currentWriter != null) {
+            length += currentWriter.length();
+        }
+
+        return length;
+    }
+
+    @Override
+    public void abort() {
+        IOUtils.closeQuietly(this);
+
+        // Abort all those writers.
+        for (FileWriter<T, R> writer : openedWriters) {
+            writer.abort();
+        }
+    }
+
+    @Override
+    public List<R> result() {
+        Preconditions.checkState(closed, "Cannot access the results unless 
close all writers.");
+
+        return results;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+            closeCurrentWriter();
+
+            closed = true;
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 826018a..015050d 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -39,8 +39,8 @@ import 
org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.function.QuadFunction;
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index b88ee01..4e48f84 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -40,7 +40,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 

Reply via email to