JingsongLi commented on code in PR #546:
URL: https://github.com/apache/flink-table-store/pull/546#discussion_r1113926517


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -477,12 +477,20 @@ private boolean tryCommitOnce(
         List<ManifestFileMeta> newMetas = new ArrayList<>();
         List<ManifestFileMeta> changelogMetas = new ArrayList<>();
         try {
+            long previousChangesRecordCount = 0L;
             if (latestSnapshot != null) {
+                List<ManifestFileMeta> previousManifests =
+                        latestSnapshot.dataManifests(manifestList);
                 // read all previous manifest files
-                
oldMetas.addAll(latestSnapshot.readAllDataManifests(manifestList));
+                oldMetas.addAll(previousManifests);
+                previousChangesRecordCount =
+                        latestSnapshot.version() >= Snapshot.CURRENT_VERSION

Review Comment:
   Create a util method: `totalRecordCount(Snapshot snapshot)`.
   And I think maybe just judge that field is null instead of snapshot version?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java:
##########
@@ -124,6 +130,24 @@ public class Snapshot {
     @JsonProperty(FIELD_LOG_OFFSETS)
     private final Map<Integer, Long> logOffsets;
 
+    // record count of all changes from the previous snapshots
+    // null for table store <= 0.3
+    @JsonProperty(FIELD_BASE_RECORD_COUNT)
+    @Nullable
+    private final Long baseRecordCount;

Review Comment:
   maybe just provide a `totalRecordCount`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to