JingsongLi commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923160171


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> 
changes) {
-        Set<ManifestEntry.Identifier> removedFiles =
-                changes.stream()
-                        .filter(e -> e.kind().equals(FileKind.DELETE))
-                        .map(ManifestEntry::identifier)
-                        .collect(Collectors.toSet());
-        if (removedFiles.isEmpty()) {
-            // early exit for append only changes
-            return;
-        }
-
         List<BinaryRowData> changedPartitions =
                 changes.stream()
                         .map(ManifestEntry::partition)
                         .distinct()
                         .collect(Collectors.toList());
+        List<ManifestEntry> currentEntries;
         try {
-            for (ManifestEntry entry :
+            currentEntries =
                     scan.withSnapshot(snapshotId)
                             .withPartitionFilter(changedPartitions)
                             .plan()
-                            .files()) {
-                removedFiles.remove(entry.identifier());
-            }
+                            .files();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot determine if conflicts exist.", 
e);
         }
 
+        noConflictsForDeletedFilesOrFail(currentEntries, changes);
+        noConflictsForLsmOrFail(currentEntries, changes);
+    }
+
+    private void noConflictsForDeletedFilesOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(FileKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        // check that files to be removed are not yet removed
+        for (ManifestEntry entry : currentEntries) {
+            removedFiles.remove(entry.identifier());
+        }
         if (!removedFiles.isEmpty()) {
             throw new RuntimeException(
-                    "Conflicts detected on:\n"
+                    "File deletion conflicts detected! Give up committing 
compact changes. Conflict files are:\n"
                             + removedFiles.stream()
-                                    .map(
-                                            i ->
-                                                    
pathFactory.getPartitionString(i.partition)
-                                                            + ", bucket "
-                                                            + i.bucket
-                                                            + ", level "
-                                                            + i.level
-                                                            + ", file "
-                                                            + i.fileName)
+                                    .map(i -> i.toString(pathFactory))
                                     .collect(Collectors.joining("\n")));
         }
     }
 
+    private void noConflictsForLsmOrFail(
+            List<ManifestEntry> currentEntries, List<ManifestEntry> changes) {
+        if (keyComparator == null) {
+            return;
+        }
+
+        List<ManifestEntry> allEntries = new ArrayList<>();
+        allEntries.addAll(currentEntries);
+        allEntries.addAll(changes);
+        Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
+        for (ManifestEntry entry : 
ManifestEntry.mergeManifestEntries(allEntries)) {
+            int level = entry.file().level();
+            if (level >= 1) {
+                levels.compute(
+                                new LevelIdentifier(entry.partition(), 
entry.bucket(), level),
+                                (lv, list) -> list == null ? new ArrayList<>() 
: list)
+                        .add(entry);
+            }
+        }
+
+        // check for all LSM level >= 1, key ranges of files do not intersect
+        for (List<ManifestEntry> entries : levels.values()) {
+            entries.sort((a, b) -> keyComparator.compare(a.file().minKey(), 
b.file().minKey()));
+            for (int i = 0; i + 1 < entries.size(); i++) {
+                ManifestEntry a = entries.get(i);
+                ManifestEntry b = entries.get(i + 1);
+                if (keyComparator.compare(a.file().maxKey(), 
b.file().minKey()) >= 0) {

Review Comment:
   Ah... Yes, it is >=



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to