JingsongLi commented on code in PR #316:
URL: https://github.com/apache/flink-table-store/pull/316#discussion_r995319127


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -190,12 +192,18 @@ public void commit(ManifestCommittable committable, 
Map<String, String> properti
         Long safeLatestSnapshotId = null;
         List<ManifestEntry> baseEntries = new ArrayList<>();
 
-        List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), FileKind.ADD);
-        List<ManifestEntry> compactChanges = new ArrayList<>();
-        compactChanges.addAll(collectChanges(committable.compactBefore(), 
FileKind.DELETE));
-        compactChanges.addAll(collectChanges(committable.compactAfter(), 
FileKind.ADD));
-
-        if (createEmptyCommit || !appendChanges.isEmpty()) {
+        List<ManifestEntry> appendMergeTree = new ArrayList<>();

Review Comment:
   No mergetree, `appendDataFiles`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##########
@@ -138,46 +133,31 @@ public void flushMemory() throws Exception {
                 trySyncLatestCompaction(true);
             }
 
-            // write changelog file
-            List<String> extraFiles = new ArrayList<>();
-            if (changelogProducer == ChangelogProducer.INPUT) {
-                SingleFileWriter<KeyValue, Void> writer = 
writerFactory.createChangelogFileWriter();
-                writer.write(memTable.rawIterator());
-                writer.close();
-                extraFiles.add(writer.path().getName());
-            }
-
             // write lsm level 0 file
-            try {
-                Iterator<KeyValue> iterator = 
memTable.mergeIterator(keyComparator, mergeFunction);
-                KeyValueDataFileWriter writer = 
writerFactory.createLevel0Writer();
-                writer.write(iterator);
-                writer.close();
-
-                // In theory, this fileMeta should contain statistics from 
both lsm file extra file.
-                // However for level 0 files, as we do not drop DELETE 
records, keys appear in one
-                // file will also appear in the other. So we just need to use 
statistics from one of
-                // them.
-                //
-                // For value count merge function, it is possible that we have 
changelog first
-                // adding one record then remove one record, but after merging 
this record will not
-                // appear in lsm file. This is OK because we can also skip 
this changelog.
-                DataFileMeta fileMeta = writer.result();
-                if (fileMeta == null) {
-                    for (String extraFile : extraFiles) {
-                        writerFactory.deleteFile(extraFile);
+            Iterator<KeyValue> iterator = 
memTable.mergeIterator(keyComparator, mergeFunction);
+            KeyValueDataFileWriter writer = 
writerFactory.createMergeTreeFileWriter(0);
+            writer.write(iterator);
+            writer.close();
+            DataFileMeta fileMeta = writer.result();
+
+            if (fileMeta != null) {
+                newFiles.add(fileMeta);
+                compactManager.addNewFile(fileMeta);
+
+                // write changelog file
+                if (changelogProducer == ChangelogProducer.INPUT) {
+                    try {
+                        KeyValueDataFileWriter changelogWriter =
+                                writerFactory.createChangelogFileWriter(0);
+                        changelogWriter.write(memTable.rawIterator());
+                        changelogWriter.close();
+                        changelogFiles.add(changelogWriter.result());
+                    } catch (Exception e) {
+                        // exception occurs, clean up written file
+                        writerFactory.deleteFile(fileMeta.fileName());

Review Comment:
   no need to delete file, already in `newFiles`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to