JingsongLi commented on code in PR #298:
URL: https://github.com/apache/flink-table-store/pull/298#discussion_r979506509


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##########
@@ -138,34 +137,45 @@ public void flushMemory() throws Exception {
                 // stop writing, wait for compaction finished
                 trySyncLatestCompaction(true);
             }
+
+            // write changelog file
             List<String> extraFiles = new ArrayList<>();
             if (changelogProducer == ChangelogProducer.INPUT) {
-                extraFiles.add(
-                        dataFileWriter
-                                .writeLevel0Changelog(
-                                        CloseableIterator.adapterForIterator(
-                                                memTable.rawIterator()))
-                                .getName());
+                SingleFileWriter<KeyValue, Void> writer = 
writerFactory.createExtraFileWriter();
+                writer.write(memTable.rawIterator());
+                writer.close();
+                extraFiles.add(writer.path().getName());
             }
-            boolean success = false;
+
+            // write lsm level 0 file
             try {
                 Iterator<KeyValue> iterator = 
memTable.mergeIterator(keyComparator, mergeFunction);
-                success =
-                        dataFileWriter
-                                
.writeLevel0(CloseableIterator.adapterForIterator(iterator))
-                                .map(
-                                        file -> {
-                                            DataFileMeta fileMeta = 
file.copy(extraFiles);
-                                            newFiles.add(fileMeta);
-                                            
compactManager.addNewFile(fileMeta);
-                                            return true;
-                                        })
-                                .orElse(false);
-            } finally {
-                if (!success) {
-                    extraFiles.forEach(dataFileWriter::delete);
+                KeyValueDataFileWriter writer = 
writerFactory.createLevel0Writer();
+                writer.write(iterator);
+                writer.close();
+
+                // In theory, this fileMeta should contain statistics from 
both lsm file extra file.
+                // However for level 0 files, as we do not drop DELETE 
records, keys appear in one
+                // file will also appear in the other. So we just need to use 
statistics from one of
+                // them.
+                //
+                // For value count merge function, it is possible that we have 
changelog first
+                // adding one record then remove one record, but after merging 
this record will not
+                // appear in lsm file. This is OK because we can also skip 
this changelog.
+                DataFileMeta fileMeta = writer.result();
+                if (fileMeta != null) {

Review Comment:
   `if (fileMeta == null)`, `extraFiles` is not be deleted?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileWriterFactory.java:
##########
@@ -149,16 +96,35 @@ private KeyValueDataFileWriter createDataFileWriter(int 
level) {
                 level);
     }
 
-    public void delete(DataFileMeta file) {
-        delete(file.fileName());
+    public SingleFileWriter<KeyValue, Void> createExtraFileWriter() {

Review Comment:
   `createChangelogFileWriter`? we may have more extra files in future.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueFileWriterFactory.java:
##########
@@ -52,20 +44,19 @@ public class DataFileWriter {
     private final DataFilePathFactory pathFactory;
     private final long suggestedFileSize;
 
-    private DataFileWriter(
+    private KeyValueFileWriterFactory(
             long schemaId,
             RowType keyType,
             RowType valueType,
             BulkWriter.Factory<RowData> writerFactory,
-            @Nullable FileStatsExtractor fileStatsExtractor,
+            FileStatsExtractor fileStatsExtractor,

Review Comment:
   why remove nullable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to