This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 1fedb65 [FLINK-27517] Introduce rolling file writer to write one
record each time for append-only table
1fedb65 is described below
commit 1fedb6586ba292bedf00bb1b6922d5110a2f7fa8
Author: openinx <[email protected]>
AuthorDate: Sun May 8 20:02:20 2022 +0800
[FLINK-27517] Introduce rolling file writer to write one record each time
for append-only table
This closes #110
---
.../store/connector/sink/StoreSinkWriter.java | 2 +-
.../table/store/connector/sink/StoreSinkTest.java | 2 +-
.../table/store/connector/sink/TestFileStore.java | 2 +-
.../source/FileStoreSourceSplitReaderTest.java | 2 +-
.../store/connector/source/TestDataReadWrite.java | 2 +-
.../table/store/file/data/DataFileWriter.java | 218 +++++++++------------
.../table/store/file/manifest/ManifestFile.java | 110 ++++++-----
.../store/file/mergetree/MergeTreeWriter.java | 2 +-
.../table/store/file/operation/FileStoreWrite.java | 2 +-
.../store/file/operation/FileStoreWriteImpl.java | 2 +-
.../flink/table/store/file/utils/RollingFile.java | 125 ------------
.../table/store/file/writer/BaseBulkWriter.java | 59 ++++++
.../table/store/file/writer/BaseFileWriter.java | 121 ++++++++++++
.../flink/table/store/file/writer/FileWriter.java | 86 ++++++++
.../store/file/{utils => writer}/RecordWriter.java | 2 +-
.../table/store/file/writer/RollingFileWriter.java | 132 +++++++++++++
.../flink/table/store/file/TestFileStore.java | 2 +-
.../table/store/file/mergetree/MergeTreeTest.java | 2 +-
18 files changed, 572 insertions(+), 301 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 2995b74..4a114b4 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.log.LogWriteCallback;
import org.apache.flink.table.store.sink.SinkRecord;
import org.apache.flink.table.store.sink.SinkRecordConverter;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index de48269..303e471 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import
org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 3c98e3b..639ef35 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -32,7 +32,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.stats.FieldStats;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index acaac22..7002666 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.AfterAll;
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
index abc1922..b6997e8 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
@@ -33,7 +33,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
index a57e6df..b46db7f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
@@ -33,7 +33,9 @@ import
org.apache.flink.table.store.file.stats.FieldStatsCollector;
import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.RollingFile;
+import org.apache.flink.table.store.file.writer.BaseBulkWriter;
+import org.apache.flink.table.store.file.writer.BaseFileWriter;
+import org.apache.flink.table.store.file.writer.RollingFileWriter;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CloseableIterator;
@@ -41,9 +43,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
+import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
+import java.util.function.Supplier;
/** Writes {@link KeyValue}s into data files. */
public class DataFileWriter {
@@ -91,176 +94,149 @@ public class DataFileWriter {
}
/**
- * Write several {@link KeyValue}s into an data file of a given level.
+ * Write several {@link KeyValue}s into a data file of a given level.
*
* <p>NOTE: This method is atomic.
*/
public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int
level)
throws Exception {
- DataRollingFile rollingFile =
- fileStatsExtractor == null
- ? new StatsCollectingRollingFile(level)
- : new FileExtractingRollingFile(level);
- List<DataFileMeta> result = new ArrayList<>();
- List<Path> filesToCleanUp = new ArrayList<>();
- try {
- rollingFile.write(iterator, result, filesToCleanUp);
+
+ RollingKvWriter rollingKvWriter = createRollingKvWriter(level,
suggestedFileSize);
+ try (RollingKvWriter writer = rollingKvWriter) {
+ writer.write(iterator);
+
} catch (Throwable e) {
LOG.warn("Exception occurs when writing data files. Cleaning up.",
e);
- for (Path path : filesToCleanUp) {
- FileUtils.deleteOrWarn(path);
- }
+
+ rollingKvWriter.abort();
throw e;
} finally {
iterator.close();
}
- return result;
+
+ return rollingKvWriter.result();
}
public void delete(DataFileMeta file) {
FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
}
- private abstract class DataRollingFile extends RollingFile<KeyValue,
DataFileMeta> {
+ private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
+
+ @Override
+ public BulkWriter<KeyValue> create(FSDataOutputStream out) throws
IOException {
+ KeyValueSerializer serializer = new KeyValueSerializer(keyType,
valueType);
+
+ return new BaseBulkWriter<>(writerFactory.create(out),
serializer::toRow);
+ }
+ }
+ private class KvFileWriter extends BaseFileWriter<KeyValue, DataFileMeta> {
private final int level;
- private final KeyValueSerializer serializer;
private final RowDataSerializer keySerializer;
- private long rowCount;
- private BinaryRowData minKey;
- private RowData maxKey;
- private long minSequenceNumber;
- private long maxSequenceNumber;
-
- private DataRollingFile(int level) {
- // each level 0 data file is a sorted run,
- // we must not write rolling files for level 0 data files
- // otherwise we cannot reduce the number of sorted runs when
compacting
- super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
+ private FieldStatsCollector keyStatsCollector = null;
+ private FieldStatsCollector valueStatsCollector = null;
+
+ private BinaryRowData minKey = null;
+ private RowData maxKey = null;
+ private long minSeqNumber = Long.MAX_VALUE;
+ private long maxSeqNumber = Long.MIN_VALUE;
+
+ public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path
path, int level)
+ throws IOException {
+ super(writerFactory, path);
+
this.level = level;
- this.serializer = new KeyValueSerializer(keyType, valueType);
this.keySerializer = new RowDataSerializer(keyType);
- resetMeta();
- }
-
- @Override
- protected Path newPath() {
- return pathFactory.newPath();
+ if (fileStatsExtractor == null) {
+ this.keyStatsCollector = new FieldStatsCollector(keyType);
+ this.valueStatsCollector = new FieldStatsCollector(valueType);
+ }
}
@Override
- protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws
IOException {
- return writerFactory.create(out);
- }
+ public void write(KeyValue kv) throws IOException {
+ super.write(kv);
- @Override
- protected RowData toRowData(KeyValue kv) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing key-value to data file, kv: " +
kv.toString(keyType, valueType));
+ if (fileStatsExtractor == null) {
+ keyStatsCollector.collect(kv.key());
+ valueStatsCollector.collect(kv.value());
}
- rowCount++;
+ updateMinKey(kv);
+ updateMaxKey(kv);
+
+ updateMinSeqNumber(kv);
+ updateMaxSeqNumber(kv);
+ }
+
+ private void updateMinKey(KeyValue kv) {
if (minKey == null) {
minKey = keySerializer.toBinaryRow(kv.key()).copy();
}
- maxKey = kv.key();
- minSequenceNumber = Math.min(minSequenceNumber,
kv.sequenceNumber());
- maxSequenceNumber = Math.max(maxSequenceNumber,
kv.sequenceNumber());
-
- return serializer.toRow(kv);
}
- @Override
- protected DataFileMeta collectFile(Path path) throws IOException {
- KeyAndValueStats stats = extractStats(path);
- DataFileMeta result =
- new DataFileMeta(
- path.getName(),
- FileUtils.getFileSize(path),
- rowCount,
- minKey,
- keySerializer.toBinaryRow(maxKey).copy(),
- stats.keyStats,
- stats.valueStats,
- minSequenceNumber,
- maxSequenceNumber,
- level);
- resetMeta();
- return result;
+ private void updateMaxKey(KeyValue kv) {
+ maxKey = kv.key();
}
- protected void resetMeta() {
- rowCount = 0;
- minKey = null;
- maxKey = null;
- minSequenceNumber = Long.MAX_VALUE;
- maxSequenceNumber = Long.MIN_VALUE;
+ private void updateMinSeqNumber(KeyValue kv) {
+ minSeqNumber = Math.min(minSeqNumber, kv.sequenceNumber());
}
- protected abstract KeyAndValueStats extractStats(Path path);
- }
-
- private class FileExtractingRollingFile extends DataRollingFile {
-
- private FileExtractingRollingFile(int level) {
- super(level);
+ private void updateMaxSeqNumber(KeyValue kv) {
+ maxSeqNumber = Math.max(maxSeqNumber, kv.sequenceNumber());
}
@Override
- protected KeyAndValueStats extractStats(Path path) {
- FieldStats[] rawStats;
- try {
- rawStats = fileStatsExtractor.extract(path);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ protected DataFileMeta createFileMeta(Path path) throws IOException {
+
+ FieldStats[] keyStats;
+ FieldStats[] valueStats;
+ if (fileStatsExtractor == null) {
+ keyStats = keyStatsCollector.extract();
+ valueStats = valueStatsCollector.extract();
+ } else {
+ FieldStats[] rowStats = fileStatsExtractor.extract(path);
+ int numKeyFields = keyType.getFieldCount();
+ keyStats = Arrays.copyOfRange(rowStats, 0, numKeyFields);
+ valueStats = Arrays.copyOfRange(rowStats, numKeyFields + 2,
rowStats.length);
}
- int numKeyFields = keyType.getFieldCount();
- return new KeyAndValueStats(
- Arrays.copyOfRange(rawStats, 0, numKeyFields),
- Arrays.copyOfRange(rawStats, numKeyFields + 2,
rawStats.length));
+ return new DataFileMeta(
+ path.getName(),
+ FileUtils.getFileSize(path),
+ recordCount(),
+ minKey,
+ keySerializer.toBinaryRow(maxKey).copy(),
+ keyStats,
+ valueStats,
+ minSeqNumber,
+ maxSeqNumber,
+ level);
}
}
- private class StatsCollectingRollingFile extends DataRollingFile {
-
- private FieldStatsCollector keyStatsCollector;
- private FieldStatsCollector valueStatsCollector;
-
- private StatsCollectingRollingFile(int level) {
- super(level);
- }
+ private static class RollingKvWriter extends RollingFileWriter<KeyValue,
DataFileMeta> {
- @Override
- protected RowData toRowData(KeyValue kv) {
- keyStatsCollector.collect(kv.key());
- valueStatsCollector.collect(kv.value());
- return super.toRowData(kv);
- }
-
- @Override
- protected KeyAndValueStats extractStats(Path path) {
- return new KeyAndValueStats(keyStatsCollector.extract(),
valueStatsCollector.extract());
- }
-
- @Override
- protected void resetMeta() {
- super.resetMeta();
- keyStatsCollector = new FieldStatsCollector(keyType);
- valueStatsCollector = new FieldStatsCollector(valueType);
+ public RollingKvWriter(Supplier<KvFileWriter> writerFactory, long
targetFileSize) {
+ super(writerFactory, targetFileSize);
}
}
- private static class KeyAndValueStats {
-
- private final FieldStats[] keyStats;
- private final FieldStats[] valueStats;
+ private Supplier<KvFileWriter> createWriterFactory(int level) {
+ return () -> {
+ try {
+ return new KvFileWriter(new KvBulkWriterFactory(),
pathFactory.newPath(), level);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ };
+ }
- private KeyAndValueStats(FieldStats[] keyStats, FieldStats[]
valueStats) {
- this.keyStats = keyStats;
- this.valueStats = valueStats;
- }
+ private RollingKvWriter createRollingKvWriter(int level, long
targetFileSize) {
+ return new RollingKvWriter(createWriterFactory(level), targetFileSize);
}
/** Creates {@link DataFileWriter}. */
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 55d60ea..48a5372 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -29,16 +29,19 @@ import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.stats.FieldStatsCollector;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.utils.RollingFile;
import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
+import org.apache.flink.table.store.file.writer.BaseBulkWriter;
+import org.apache.flink.table.store.file.writer.BaseFileWriter;
+import org.apache.flink.table.store.file.writer.RollingFileWriter;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
+import java.io.UncheckedIOException;
import java.util.List;
+import java.util.function.Supplier;
/**
* This file includes several {@link ManifestEntry}s, representing the
additional changes since last
@@ -90,48 +93,51 @@ public class ManifestFile {
* <p>NOTE: This method is atomic.
*/
public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
- ManifestRollingFile rollingFile = new ManifestRollingFile();
- List<ManifestFileMeta> result = new ArrayList<>();
- List<Path> filesToCleanUp = new ArrayList<>();
- try {
- rollingFile.write(entries.iterator(), result, filesToCleanUp);
- } catch (Throwable e) {
+
+ ManifestRollingWriter rollingWriter =
createManifestRollingWriter(suggestedFileSize);
+ try (ManifestRollingWriter writer = rollingWriter) {
+ writer.write(entries);
+
+ } catch (Exception e) {
LOG.warn("Exception occurs when writing manifest files. Cleaning
up.", e);
- for (Path path : filesToCleanUp) {
- FileUtils.deleteOrWarn(path);
- }
+
+ rollingWriter.abort();
throw new RuntimeException(e);
}
- return result;
+
+ return rollingWriter.result();
}
public void delete(String fileName) {
FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
}
- private class ManifestRollingFile extends RollingFile<ManifestEntry,
ManifestFileMeta> {
-
- private long numAddedFiles;
- private long numDeletedFiles;
- private FieldStatsCollector statsCollector;
-
- private ManifestRollingFile() {
- super(suggestedFileSize);
- resetMeta();
- }
+ private class ManifestEntryBulkWriterFactory implements
BulkWriter.Factory<ManifestEntry> {
@Override
- protected Path newPath() {
- return pathFactory.newManifestFile();
+ public BulkWriter<ManifestEntry> create(FSDataOutputStream out) throws
IOException {
+ return new BaseBulkWriter<>(writerFactory.create(out),
serializer::toRow);
}
+ }
- @Override
- protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws
IOException {
- return writerFactory.create(out);
+ private class ManifestEntryWriter extends BaseFileWriter<ManifestEntry,
ManifestFileMeta> {
+
+ private final FieldStatsCollector statsCollector;
+
+ private long numAddedFiles = 0;
+ private long numDeletedFiles = 0;
+
+ ManifestEntryWriter(BulkWriter.Factory<ManifestEntry> writerFactory,
Path path)
+ throws IOException {
+ super(writerFactory, path);
+
+ this.statsCollector = new FieldStatsCollector(partitionType);
}
@Override
- protected RowData toRowData(ManifestEntry entry) {
+ public void write(ManifestEntry entry) throws IOException {
+ super.write(entry);
+
switch (entry.kind()) {
case ADD:
numAddedFiles++;
@@ -139,32 +145,48 @@ public class ManifestFile {
case DELETE:
numDeletedFiles++;
break;
+ default:
+ throw new UnsupportedOperationException("Unknown entry
kind: " + entry.kind());
}
- statsCollector.collect(entry.partition());
- return serializer.toRow(entry);
+ statsCollector.collect(entry.partition());
}
@Override
- protected ManifestFileMeta collectFile(Path path) throws IOException {
- ManifestFileMeta result =
- new ManifestFileMeta(
- path.getName(),
- path.getFileSystem().getFileStatus(path).getLen(),
- numAddedFiles,
- numDeletedFiles,
- statsCollector.extract());
- resetMeta();
- return result;
+ protected ManifestFileMeta createFileMeta(Path path) throws
IOException {
+ return new ManifestFileMeta(
+ path.getName(),
+ path.getFileSystem().getFileStatus(path).getLen(),
+ numAddedFiles,
+ numDeletedFiles,
+ statsCollector.extract());
}
+ }
- private void resetMeta() {
- numAddedFiles = 0;
- numDeletedFiles = 0;
- statsCollector = new FieldStatsCollector(partitionType);
+ private static class ManifestRollingWriter
+ extends RollingFileWriter<ManifestEntry, ManifestFileMeta> {
+
+ public ManifestRollingWriter(
+ Supplier<ManifestEntryWriter> writerFactory, long
targetFileSize) {
+ super(writerFactory, targetFileSize);
}
}
+ private Supplier<ManifestEntryWriter> createWriterFactory() {
+ return () -> {
+ try {
+ return new ManifestEntryWriter(
+ new ManifestEntryBulkWriterFactory(),
pathFactory.newManifestFile());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ };
+ }
+
+ private ManifestRollingWriter createManifestRollingWriter(long
targetFileSize) {
+ return new ManifestRollingWriter(createWriterFactory(),
targetFileSize);
+ }
+
/**
* Creator of {@link ManifestFile}. It reueses {@link BulkFormat} and
{@link BulkWriter.Factory}
* from {@link FileFormat}.
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 5f9e3bf..0c62407 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.util.CloseableIterator;
import java.util.ArrayList;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 0e6b49f..253bf8f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import java.util.concurrent.ExecutorService;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index b6a9409..bded00f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -36,7 +36,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
deleted file mode 100644
index 89ebaf0..0000000
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RollingFile.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.utils;
-
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.RowData;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A utility class to write a list of objects into several files, each with a
size limit.
- *
- * @param <R> record type
- * @param <F> file meta type
- */
-public abstract class RollingFile<R, F> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(RollingFile.class);
-
- private final long suggestedFileSize;
-
- public RollingFile(long suggestedFileSize) {
- this.suggestedFileSize = suggestedFileSize;
- }
-
- /** Create the path for a new file. */
- protected abstract Path newPath();
-
- /** Create a new object writer. Called per file. */
- protected abstract BulkWriter<RowData> newWriter(FSDataOutputStream out)
throws IOException;
-
- /**
- * Called before writing a record into file. Per-record calculation can be
performed here.
- *
- * @param record record to write
- * @return serialized record
- */
- protected abstract RowData toRowData(R record);
-
- /** Called before closing the current file. Per-file calculation can be
performed here. */
- protected abstract F collectFile(Path path) throws IOException;
-
- public void write(Iterator<R> iterator, List<F> result, List<Path>
filesToCleanUp)
- throws IOException {
- Writer writer = null;
- Path currentPath = null;
-
- while (iterator.hasNext()) {
- if (writer == null) {
- // create new rolling file
- currentPath = newPath();
- filesToCleanUp.add(currentPath);
- writer = new Writer(currentPath);
- }
-
- RowData serialized = toRowData(iterator.next());
- writer.write(serialized);
-
- if (writer.exceedsSuggestedFileSize()) {
- // exceeds suggested size, close current file
- writer.finish();
- result.add(collectFile(currentPath));
- writer = null;
- }
- }
-
- // finish last file
- if (writer != null) {
- writer.finish();
- result.add(collectFile(currentPath));
- }
- }
-
- private class Writer {
- private final FSDataOutputStream out;
- private final BulkWriter<RowData> writer;
-
- private Writer(Path path) throws IOException {
- this.out = path.getFileSystem().create(path,
FileSystem.WriteMode.NO_OVERWRITE);
- this.writer = newWriter(out);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create new rolling file " + path);
- }
- }
-
- private void write(RowData record) throws IOException {
- writer.addElement(record);
- }
-
- private boolean exceedsSuggestedFileSize() throws IOException {
- // NOTE: this method is inaccurate for formats buffering changes
in memory
- return out.getPos() >= suggestedFileSize;
- }
-
- private void finish() throws IOException {
- writer.finish();
- out.close();
- }
- }
-}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseBulkWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseBulkWriter.java
new file mode 100644
index 0000000..aad9d70
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseBulkWriter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Base bulk writer to delegate the generic data type writing into the {@link
RowData} writer.
+ *
+ * @param <T> generic record type.
+ */
+public class BaseBulkWriter<T> implements BulkWriter<T> {
+
+ private final BulkWriter<RowData> writer;
+
+ // Convert the record from the generic type T to the concrete RowData type.
+ private final Function<T, RowData> converter;
+
+ public BaseBulkWriter(BulkWriter<RowData> writer, Function<T, RowData>
converter) {
+ this.writer = writer;
+ this.converter = converter;
+ }
+
+ @Override
+ public void addElement(T elem) throws IOException {
+ writer.addElement(converter.apply(elem));
+ }
+
+ @Override
+ public void flush() throws IOException {
+ writer.flush();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ writer.finish();
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
new file mode 100644
index 0000000..03f0790
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The abstracted base file writer implementation for {@link FileWriter}.
+ *
+ * @param <T> record data type.
+ * @param <R> file meta data type.
+ */
+public abstract class BaseFileWriter<T, R> implements FileWriter<T, R> {
+
+ private final BulkWriter.Factory<T> writerFactory;
+ private final Path path;
+
+ private long recordCount;
+ private FSDataOutputStream currentOut = null;
+ private BulkWriter<T> currentWriter = null;
+
+ private boolean closed = false;
+
+ public BaseFileWriter(BulkWriter.Factory<T> writerFactory, Path path) {
+ this.writerFactory = writerFactory;
+ this.path = path;
+
+ this.recordCount = 0;
+ }
+
+ public Path path() {
+ return path;
+ }
+
+ private void openCurrentWriter() throws IOException {
+ this.currentOut = path.getFileSystem().create(path,
FileSystem.WriteMode.NO_OVERWRITE);
+ this.currentWriter = writerFactory.create(currentOut);
+ }
+
+ @Override
+ public void write(T row) throws IOException {
+ if (currentWriter == null) {
+ openCurrentWriter();
+ }
+
+ currentWriter.addElement(row);
+ recordCount += 1;
+ }
+
+ @Override
+ public long recordCount() {
+ return recordCount;
+ }
+
+ @Override
+ public long length() throws IOException {
+ if (currentOut != null) {
+ return currentOut.getPos();
+ }
+ return 0;
+ }
+
+ protected abstract R createFileMeta(Path path) throws IOException;
+
+ @Override
+ public void abort() {
+ IOUtils.closeQuietly(this);
+
+ // Abort to clean the orphan file.
+ FileUtils.deleteOrWarn(path);
+ }
+
+ @Override
+ public R result() throws IOException {
+ Preconditions.checkState(closed, "Cannot access the file meta unless
close this writer.");
+
+ return createFileMeta(path);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ if (currentWriter != null) {
+ currentWriter.finish();
+ currentWriter = null;
+ }
+
+ if (currentOut != null) {
+ currentOut.close();
+ currentOut = null;
+ }
+
+ closed = true;
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
new file mode 100644
index 0000000..856f99b
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * File writer to accept one record or a branch of records and generate
metadata after closing it.
+ *
+ * @param <T> record type.
+ * @param <R> file result to collect.
+ */
+public interface FileWriter<T, R> extends Closeable {
+
+ /**
+ * Add only one record to this file writer.
+ *
+ * @param record to write.
+ * @throws IOException if encounter any IO error.
+ */
+ void write(T record) throws IOException;
+
+ /**
+ * Add records from {@link Iterator} to this file writer.
+ *
+ * @param records to write
+ * @throws IOException if encounter any IO error.
+ */
+ default void write(Iterator<T> records) throws IOException {
+ while (records.hasNext()) {
+ write(records.next());
+ }
+ }
+
+ /**
+ * Add records from {@link Iterable} to file writer.
+ *
+ * @param records to write.
+ * @throws IOException if encounter any IO error.
+ */
+ default void write(Iterable<T> records) throws IOException {
+ for (T record : records) {
+ write(record);
+ }
+ }
+
+ /**
+ * The total written record count.
+ *
+ * @return record count.
+ */
+ long recordCount();
+
+ /**
+ * The estimated length of the current writer.
+ *
+ * @return the estimated length.
+ * @throws IOException if encounter any IO error.
+ */
+ long length() throws IOException;
+
+ /** Abort to clear orphan file(s) if encounter any error. */
+ void abort();
+
+ /** @return the result for this closed file writer. */
+ R result() throws IOException;
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
similarity index 97%
rename from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
rename to
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
index 4d28832..4f29e27 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.file.writer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.ValueKind;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
new file mode 100644
index 0000000..298d501
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Writer to roll over to a new file if the current size exceed the target
file size.
+ *
+ * @param <T> record data type.
+ * @param <R> the file metadata result.
+ */
+public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> {
+
+ private final Supplier<? extends FileWriter<T, R>> writerFactory;
+ private final long targetFileSize;
+ private final List<FileWriter<T, R>> openedWriters;
+ private final List<R> results;
+
+ private FileWriter<T, R> currentWriter = null;
+ private long lengthOfClosedFiles = 0L;
+ private long recordCount = 0;
+ private boolean closed = false;
+
+ public RollingFileWriter(
+ Supplier<? extends FileWriter<T, R>> writerFactory, long
targetFileSize) {
+ this.writerFactory = writerFactory;
+ this.targetFileSize = targetFileSize;
+ this.openedWriters = new ArrayList<>();
+ this.results = new ArrayList<>();
+ }
+
+ @Override
+ public void write(T row) throws IOException {
+ // Open the current writer if write the first record or roll over
happen before.
+ if (currentWriter == null) {
+ openCurrentWriter();
+ }
+
+ currentWriter.write(row);
+ recordCount += 1;
+
+ if (currentWriter.length() >= targetFileSize) {
+ closeCurrentWriter();
+ }
+ }
+
+ private void openCurrentWriter() {
+ currentWriter = writerFactory.get();
+ openedWriters.add(currentWriter);
+ }
+
+ private void closeCurrentWriter() {
+ if (currentWriter != null) {
+ try {
+ lengthOfClosedFiles += currentWriter.length();
+
+ currentWriter.close();
+ results.add(currentWriter.result());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ currentWriter = null;
+ }
+ }
+
+ @Override
+ public long recordCount() {
+ return recordCount;
+ }
+
+ @Override
+ public long length() throws IOException {
+ long length = lengthOfClosedFiles;
+ if (currentWriter != null) {
+ length += currentWriter.length();
+ }
+
+ return length;
+ }
+
+ @Override
+ public void abort() {
+ IOUtils.closeQuietly(this);
+
+ // Abort all those writers.
+ for (FileWriter<T, R> writer : openedWriters) {
+ writer.abort();
+ }
+ }
+
+ @Override
+ public List<R> result() {
+ Preconditions.checkState(closed, "Cannot access the results unless
close all writers.");
+
+ return results;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closeCurrentWriter();
+
+ closed = true;
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 826018a..015050d 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -39,8 +39,8 @@ import
org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.QuadFunction;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index b88ee01..4e48f84 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -40,7 +40,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;