JingsongLi commented on code in PR #110:
URL: https://github.com/apache/flink-table-store/pull/110#discussion_r867479945
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java:
##########
@@ -91,176 +94,149 @@ public DataFilePathFactory pathFactory() {
}
/**
- * Write several {@link KeyValue}s into an data file of a given level.
+ * Write several {@link KeyValue}s into a data file of a given level.
*
* <p>NOTE: This method is atomic.
*/
public List<DataFileMeta> write(CloseableIterator<KeyValue> iterator, int
level)
throws Exception {
- DataRollingFile rollingFile =
- fileStatsExtractor == null
- ? new StatsCollectingRollingFile(level)
- : new FileExtractingRollingFile(level);
- List<DataFileMeta> result = new ArrayList<>();
- List<Path> filesToCleanUp = new ArrayList<>();
- try {
- rollingFile.write(iterator, result, filesToCleanUp);
+
+ RollingKvWriter rollingKvWriter = createRollingKvWriter(level,
suggestedFileSize);
+ try (RollingKvWriter writer = rollingKvWriter) {
+ writer.write(iterator);
+
} catch (Throwable e) {
LOG.warn("Exception occurs when writing data files. Cleaning up.",
e);
- for (Path path : filesToCleanUp) {
- FileUtils.deleteOrWarn(path);
- }
+
+ rollingKvWriter.abort();
throw e;
} finally {
iterator.close();
}
- return result;
+
+ return rollingKvWriter.result();
}
public void delete(DataFileMeta file) {
FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
}
- private abstract class DataRollingFile extends RollingFile<KeyValue,
DataFileMeta> {
+ private class KvBulkWriterFactory implements BulkWriter.Factory<KeyValue> {
+
+ @Override
+ public BulkWriter<KeyValue> create(FSDataOutputStream out) throws
IOException {
+ KeyValueSerializer serializer = new KeyValueSerializer(keyType,
valueType);
+
+ return new BaseBulkWriter<>(writerFactory.create(out),
serializer::toRow);
+ }
+ }
+ private class KvFileWriter extends BaseFileWriter<KeyValue, DataFileMeta> {
private final int level;
- private final KeyValueSerializer serializer;
private final RowDataSerializer keySerializer;
- private long rowCount;
- private BinaryRowData minKey;
- private RowData maxKey;
- private long minSequenceNumber;
- private long maxSequenceNumber;
-
- private DataRollingFile(int level) {
- // each level 0 data file is a sorted run,
- // we must not write rolling files for level 0 data files
- // otherwise we cannot reduce the number of sorted runs when
compacting
- super(level == 0 ? Long.MAX_VALUE : suggestedFileSize);
+ private FieldStatsCollector keyStatsCollector = null;
+ private FieldStatsCollector valueStatsCollector = null;
+
+ private BinaryRowData minKey = null;
+ private RowData maxKey = null;
+ private long minSeqNumber = Long.MAX_VALUE;
+ private long maxSeqNumber = Long.MIN_VALUE;
+
+ public KvFileWriter(BulkWriter.Factory<KeyValue> writerFactory, Path
path, int level)
+ throws IOException {
+ super(writerFactory, path);
+
this.level = level;
- this.serializer = new KeyValueSerializer(keyType, valueType);
this.keySerializer = new RowDataSerializer(keyType);
- resetMeta();
- }
-
- @Override
- protected Path newPath() {
- return pathFactory.newPath();
+ if (fileStatsExtractor == null) {
+ this.keyStatsCollector = new FieldStatsCollector(keyType);
+ this.valueStatsCollector = new FieldStatsCollector(valueType);
+ }
}
@Override
- protected BulkWriter<RowData> newWriter(FSDataOutputStream out) throws
IOException {
- return writerFactory.create(out);
- }
+ public void write(KeyValue kv) throws IOException {
+ super.write(kv);
- @Override
- protected RowData toRowData(KeyValue kv) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing key-value to data file, kv: " +
kv.toString(keyType, valueType));
+ if (fileStatsExtractor == null) {
Review Comment:
Lots of `fileStatsExtractor == null` looks bad.
I think we can have a `StatsProducer` to unify `StatsExtractor` and
`StatsCollector`. To reduce caller complexity. I create
https://issues.apache.org/jira/browse/FLINK-27543 for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]