JingsongLi commented on code in PR #460:
URL: https://github.com/apache/flink-table-store/pull/460#discussion_r1063181702


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java:
##########
@@ -166,67 +191,83 @@ public List<Committable> prepareCommit(boolean 
doCompaction, long checkpointId)
             currentFirstWriteMs = null;
         }
 
-        if (snapshotIdentifierToCheck == null // wait for last forced full 
compaction to complete
-                && !writtenBuckets.isEmpty() // there should be something to 
compact
+        if (LOG.isDebugEnabled()) {
+            for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> 
checkpointIdAndBuckets :
+                    writtenBuckets.entrySet()) {
+                LOG.debug(
+                        "Written buckets for checkpoint #{} are:", 
checkpointIdAndBuckets.getKey());
+                for (Tuple2<BinaryRowData, Integer> bucket : 
checkpointIdAndBuckets.getValue()) {
+                    LOG.debug("  * partition {}, bucket {}", bucket.f0, 
bucket.f1);
+                }
+            }
+        }
+
+        if (!writtenBuckets.isEmpty() // there should be something to compact
                 && System.currentTimeMillis() - 
firstWriteMs.firstEntry().getValue()
                         >= fullCompactionThresholdMs // time without full 
compaction exceeds
         ) {
             doCompaction = true;
         }
 
         if (doCompaction) {
-            snapshotIdentifierToCheck = checkpointId;
-            Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new 
HashSet<>();
-            for (Set<Tuple2<BinaryRowData, Integer>> buckets : 
writtenBuckets.values()) {
-                for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
-                    if (compactedBuckets.contains(bucket)) {
-                        continue;
-                    }
-                    compactedBuckets.add(bucket);
-                    try {
-                        write.compact(bucket.f0, bucket.f1, true);
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Submit full compaction for checkpoint #{}", 
checkpointId);
             }
+            submitFullCompaction();
+            commitIdentifiersToCheck.add(checkpointId);
         }
 
         return super.prepareCommit(doCompaction, checkpointId);
     }
 
-    private Optional<Snapshot> findSnapshot(long identifierToCheck) {
-        // TODO We need a mechanism to do timeout recovery in case of snapshot 
expiration.
+    private void checkSuccessfulFullCompaction() {
         SnapshotManager snapshotManager = table.snapshotManager();
         Long latestId = snapshotManager.latestSnapshotId();
         if (latestId == null) {
-            return Optional.empty();
+            return;
         }
-
         Long earliestId = snapshotManager.earliestSnapshotId();
         if (earliestId == null) {
-            return Optional.empty();
+            return;
         }
 
-        // We must find the snapshot whose identifier is exactly 
`identifierToCheck`.
-        // We can't just compare with the latest snapshot identifier by this 
user (like what we do
-        // in `AbstractFileStoreWrite`), because even if the latest snapshot 
identifier is newer,
-        // compact changes may still be discarded due to commit conflicts.
         for (long id = latestId; id >= earliestId; id--) {
             Snapshot snapshot = snapshotManager.snapshot(id);
-            if (!snapshot.commitUser().equals(commitUser)) {
-                continue;
-            }
-            if (snapshot.commitIdentifier() == identifierToCheck) {
-                return Optional.of(snapshot);
-            } else if (snapshot.commitIdentifier() < identifierToCheck) {
-                // We're searching from new snapshots to old ones. So if we 
find an older
-                // identifier, we can make sure our target snapshot will never 
occur.
-                return Optional.empty();
+            if (snapshot.commitUser().equals(commitUser)
+                    && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                long commitIdentifier = snapshot.commitIdentifier();
+                if (commitIdentifiersToCheck.contains(commitIdentifier)) {
+                    // we found a full compaction snapshot
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Found full compaction snapshot #{} with 
identifier {}",
+                                id,
+                                commitIdentifier);
+                    }
+                    writtenBuckets.headMap(commitIdentifier, true).clear();
+                    firstWriteMs.headMap(commitIdentifier, true).clear();
+                    commitIdentifiersToCheck.headSet(commitIdentifier).clear();
+                    break;
+                }
             }
         }
+    }
 
-        return Optional.empty();
+    private void submitFullCompaction() {
+        Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new HashSet<>();
+        for (Set<Tuple2<BinaryRowData, Integer>> buckets : 
writtenBuckets.values()) {

Review Comment:
   Minor, map iteration using `forEach`:
   ```
   writtenBuckets.forEach(
       (cp, buckets) -> {
           for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
               writtenBucketList.add(Tuple3.of(cp, bucket.f0, bucket.f1));
           }
       });
   ```



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FullChangelogStoreSinkWrite.java:
##########
@@ -166,67 +191,83 @@ public List<Committable> prepareCommit(boolean 
doCompaction, long checkpointId)
             currentFirstWriteMs = null;
         }
 
-        if (snapshotIdentifierToCheck == null // wait for last forced full 
compaction to complete
-                && !writtenBuckets.isEmpty() // there should be something to 
compact
+        if (LOG.isDebugEnabled()) {
+            for (Map.Entry<Long, Set<Tuple2<BinaryRowData, Integer>>> 
checkpointIdAndBuckets :
+                    writtenBuckets.entrySet()) {
+                LOG.debug(
+                        "Written buckets for checkpoint #{} are:", 
checkpointIdAndBuckets.getKey());
+                for (Tuple2<BinaryRowData, Integer> bucket : 
checkpointIdAndBuckets.getValue()) {
+                    LOG.debug("  * partition {}, bucket {}", bucket.f0, 
bucket.f1);
+                }
+            }
+        }
+
+        if (!writtenBuckets.isEmpty() // there should be something to compact
                 && System.currentTimeMillis() - 
firstWriteMs.firstEntry().getValue()
                         >= fullCompactionThresholdMs // time without full 
compaction exceeds
         ) {
             doCompaction = true;
         }
 
         if (doCompaction) {
-            snapshotIdentifierToCheck = checkpointId;
-            Set<Tuple2<BinaryRowData, Integer>> compactedBuckets = new 
HashSet<>();
-            for (Set<Tuple2<BinaryRowData, Integer>> buckets : 
writtenBuckets.values()) {
-                for (Tuple2<BinaryRowData, Integer> bucket : buckets) {
-                    if (compactedBuckets.contains(bucket)) {
-                        continue;
-                    }
-                    compactedBuckets.add(bucket);
-                    try {
-                        write.compact(bucket.f0, bucket.f1, true);
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Submit full compaction for checkpoint #{}", 
checkpointId);
             }
+            submitFullCompaction();
+            commitIdentifiersToCheck.add(checkpointId);
         }
 
         return super.prepareCommit(doCompaction, checkpointId);
     }
 
-    private Optional<Snapshot> findSnapshot(long identifierToCheck) {
-        // TODO We need a mechanism to do timeout recovery in case of snapshot 
expiration.
+    private void checkSuccessfulFullCompaction() {
         SnapshotManager snapshotManager = table.snapshotManager();
         Long latestId = snapshotManager.latestSnapshotId();
         if (latestId == null) {
-            return Optional.empty();
+            return;
         }
-
         Long earliestId = snapshotManager.earliestSnapshotId();
         if (earliestId == null) {
-            return Optional.empty();
+            return;
         }
 
-        // We must find the snapshot whose identifier is exactly 
`identifierToCheck`.
-        // We can't just compare with the latest snapshot identifier by this 
user (like what we do
-        // in `AbstractFileStoreWrite`), because even if the latest snapshot 
identifier is newer,
-        // compact changes may still be discarded due to commit conflicts.
         for (long id = latestId; id >= earliestId; id--) {
             Snapshot snapshot = snapshotManager.snapshot(id);
-            if (!snapshot.commitUser().equals(commitUser)) {
-                continue;
-            }
-            if (snapshot.commitIdentifier() == identifierToCheck) {
-                return Optional.of(snapshot);
-            } else if (snapshot.commitIdentifier() < identifierToCheck) {
-                // We're searching from new snapshots to old ones. So if we 
find an older
-                // identifier, we can make sure our target snapshot will never 
occur.
-                return Optional.empty();
+            if (snapshot.commitUser().equals(commitUser)
+                    && snapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+                long commitIdentifier = snapshot.commitIdentifier();
+                if (commitIdentifiersToCheck.contains(commitIdentifier)) {
+                    // we found a full compaction snapshot

Review Comment:
   Add more comments to explain why we need to find a full compaction snapshot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to