JingsongLi commented on code in PR #298:
URL: https://github.com/apache/flink-table-store/pull/298#discussion_r979506509
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##########
@@ -138,34 +137,45 @@ public void flushMemory() throws Exception {
// stop writing, wait for compaction finished
trySyncLatestCompaction(true);
}
+
+ // write changelog file
List<String> extraFiles = new ArrayList<>();
if (changelogProducer == ChangelogProducer.INPUT) {
- extraFiles.add(
- dataFileWriter
- .writeLevel0Changelog(
- CloseableIterator.adapterForIterator(
- memTable.rawIterator()))
- .getName());
+ SingleFileWriter<KeyValue, Void> writer =
writerFactory.createExtraFileWriter();
+ writer.write(memTable.rawIterator());
+ writer.close();
+ extraFiles.add(writer.path().getName());
}
- boolean success = false;
+
+ // write lsm level 0 file
try {
Iterator<KeyValue> iterator =
memTable.mergeIterator(keyComparator, mergeFunction);
- success =
- dataFileWriter
-
.writeLevel0(CloseableIterator.adapterForIterator(iterator))
- .map(
- file -> {
- DataFileMeta fileMeta =
file.copy(extraFiles);
- newFiles.add(fileMeta);
-
compactManager.addNewFile(fileMeta);
- return true;
- })
- .orElse(false);
- } finally {
- if (!success) {
- extraFiles.forEach(dataFileWriter::delete);
+ KeyValueDataFileWriter writer =
writerFactory.createLevel0Writer();
+ writer.write(iterator);
+ writer.close();
+
+ // In theory, this fileMeta should contain statistics from
both lsm file extra file.
+ // However for level 0 files, as we do not drop DELETE
records, keys appear in one
+ // file will also appear in the other. So we just need to use
statistics from one of
+ // them.
+ //
+ // For value count merge function, it is possible that we have
changelog first
+ // adding one record then remove one record, but after merging
this record will not
+ // appear in lsm file. This is OK because we can also skip
this changelog.
+ DataFileMeta fileMeta = writer.result();
+ if (fileMeta != null) {
Review Comment:
`if (fileMeta == null)`, `extraFiles` is not be deleted?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileWriterFactory.java:
##########
@@ -149,16 +96,35 @@ private KeyValueDataFileWriter createDataFileWriter(int
level) {
level);
}
- public void delete(DataFileMeta file) {
- delete(file.fileName());
+ public SingleFileWriter<KeyValue, Void> createExtraFileWriter() {
Review Comment:
`createChangelogFileWriter`? we may have more extra files in future.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileWriterFactory.java:
##########
@@ -52,20 +44,19 @@ public class DataFileWriter {
private final DataFilePathFactory pathFactory;
private final long suggestedFileSize;
- private DataFileWriter(
+ private KeyValueFileWriterFactory(
long schemaId,
RowType keyType,
RowType valueType,
BulkWriter.Factory<RowData> writerFactory,
- @Nullable FileStatsExtractor fileStatsExtractor,
+ FileStatsExtractor fileStatsExtractor,
Review Comment:
why remove nullable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]