Copilot commented on code in PR #2208:
URL: https://github.com/apache/fluss/pull/2208#discussion_r2634336512


##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java:
##########
@@ -17,68 +17,57 @@
 
 package org.apache.fluss.lake.committer;
 
-import org.apache.fluss.utils.types.Tuple2;
-
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
 /**
- * The lake already committed snapshot, containing the lake snapshot id and 
the bucket end offsets
- * in this snapshot.
+ * The lake already committed snapshot, containing the lake snapshot id and 
the properties stored in
+ * this snapshot.
  */
 public class CommittedLakeSnapshot {
 
     private final long lakeSnapshotId;
-    // <partition_id, bucket> -> log offset, partition_id will be null if it's 
not a
-    // partition bucket
-    private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new 
HashMap<>();
 
-    public CommittedLakeSnapshot(long lakeSnapshotId) {
+    private final Map<String, String> snapshotProperties;
+
+    public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> 
snapshotProperties) {
         this.lakeSnapshotId = lakeSnapshotId;
+        this.snapshotProperties = snapshotProperties;
     }
 
     public long getLakeSnapshotId() {
         return lakeSnapshotId;
     }
 
-    public void addBucket(int bucketId, long offset) {
-        logEndOffsets.put(Tuple2.of(null, bucketId), offset);
-    }
-
-    public void addPartitionBucket(Long partitionId, int bucketId, long 
offset) {
-        logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
-    }
-
-    public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
-        return logEndOffsets;
+    public Map<String, String> getSnapshotProperties() {
+        return snapshotProperties;
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) {
             return true;
         }
-        if (o == null || getClass() != o.getClass()) {
+        if (!(o instanceof CommittedLakeSnapshot)) {

Review Comment:
   Inconsistent equals implementation: The equals method changed from using 
getClass() check to instanceof check. While instanceof is generally preferred 
for equals implementations to allow for inheritance, this is a style change 
that should be applied consistently across the codebase. Verify this aligns 
with the project's coding standards.
   ```suggestion
           if (o == null || getClass() != o.getClass()) {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -312,10 +317,36 @@ private void checkFlussNotMissingLakeSnapshot(
         // known lake snapshot, which means the data already has been 
committed to lake,
         // not to commit to lake to avoid data duplicated
         if (missingCommittedSnapshot != null) {
+            String logOffsetsProperty =
+                    missingCommittedSnapshot
+                            .getSnapshotProperties()
+                            .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
+            if (logOffsetsProperty == null) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Missing required log offsets property '%s' in 
lake snapshot %d for table: {tablePath=%s, tableId=%d}. "
+                                        + "This property is required to commit 
the missing snapshot to Fluss. "
+                                        + "The snapshot may have been created 
by an older version of Fluss that did not store this information, "
+                                        + "or the snapshot properties may be 
corrupted.",
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                missingCommittedSnapshot.getLakeSnapshotId(),
+                                tablePath,
+                                tableId));

Review Comment:
   Error message formatting issue: The error message uses placeholder syntax 
{tablePath=%s, tableId=%d} but these are being replaced by String.format with 
%s and %d respectively. The curly braces appear to be decorative rather than 
actual placeholders, which could be confusing. Consider either removing the 
curly braces or using a consistent format like 'tablePath=%s, tableId=%d' 
without the braces.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -312,10 +317,36 @@ private void checkFlussNotMissingLakeSnapshot(
         // known lake snapshot, which means the data already has been 
committed to lake,
         // not to commit to lake to avoid data duplicated
         if (missingCommittedSnapshot != null) {
+            String logOffsetsProperty =
+                    missingCommittedSnapshot
+                            .getSnapshotProperties()
+                            .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);

Review Comment:
   Potential NullPointerException: The method getSnapshotProperties() could 
return null, but there's no null check before calling get() on it. Consider 
adding a null check for snapshotProperties before attempting to retrieve the 
property value, similar to how other lake committers check for null properties.
   ```suggestion
               Map<String, String> snapshotProperties =
                       missingCommittedSnapshot.getSnapshotProperties();
               if (snapshotProperties == null) {
                   throw new IllegalStateException(
                           String.format(
                                   "Missing snapshot properties in lake 
snapshot %d for table: {tablePath=%s, tableId=%d}. "
                                           + "These properties are required to 
commit the missing snapshot to Fluss. "
                                           + "The snapshot may have been 
created by an older version of Fluss that did not store this information, "
                                           + "or the snapshot properties may be 
corrupted.",
                                   
missingCommittedSnapshot.getLakeSnapshotId(), tablePath, tableId));
               }
   
               String logOffsetsProperty =
                       
snapshotProperties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -312,10 +317,36 @@ private void checkFlussNotMissingLakeSnapshot(
         // known lake snapshot, which means the data already has been 
committed to lake,
         // not to commit to lake to avoid data duplicated
         if (missingCommittedSnapshot != null) {
+            String logOffsetsProperty =
+                    missingCommittedSnapshot
+                            .getSnapshotProperties()
+                            .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
+            if (logOffsetsProperty == null) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Missing required log offsets property '%s' in 
lake snapshot %d for table: {tablePath=%s, tableId=%d}. "
+                                        + "This property is required to commit 
the missing snapshot to Fluss. "
+                                        + "The snapshot may have been created 
by an older version of Fluss that did not store this information, "
+                                        + "or the snapshot properties may be 
corrupted.",
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                missingCommittedSnapshot.getLakeSnapshotId(),
+                                tablePath,
+                                tableId));
+            }
+
+            Map<TableBucket, Long> logEndOffsets = new HashMap<>();
+            for (JsonNode node : OBJECT_MAPPER.readTree(logOffsetsProperty)) {
+                BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+                TableBucket tableBucket =
+                        new TableBucket(
+                                tableId, bucketOffset.getPartitionId(), 
bucketOffset.getBucket());
+                logEndOffsets.put(tableBucket, bucketOffset.getLogOffset());

Review Comment:
   Missing exception handling: The OBJECT_MAPPER.readTree() call on line 339 
can throw JsonProcessingException, but this is not explicitly handled. While 
the method signature throws Exception which would cover this, consider adding 
more specific error handling to provide a clearer error message if the JSON 
parsing fails, indicating that the snapshot properties may be corrupted.
   ```suggestion
               try {
                   for (JsonNode node : 
OBJECT_MAPPER.readTree(logOffsetsProperty)) {
                       BucketOffset bucketOffset =
                               BucketOffsetJsonSerde.INSTANCE.deserialize(node);
                       TableBucket tableBucket =
                               new TableBucket(
                                       tableId,
                                       bucketOffset.getPartitionId(),
                                       bucketOffset.getBucket());
                       logEndOffsets.put(tableBucket, 
bucketOffset.getLogOffset());
                   }
               } catch (IOException e) {
                   throw new IllegalStateException(
                           String.format(
                                   "Failed to parse log offsets property '%s' 
for lake snapshot %d "
                                           + "for table: {tablePath=%s, 
tableId=%d}. "
                                           + "The snapshot properties may be 
corrupted or produced by an incompatible Fluss version.",
                                   FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
                                   missingCommittedSnapshot.getLakeSnapshotId(),
                                   tablePath,
                                   tableId),
                           e);
   ```



##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java:
##########
@@ -17,68 +17,57 @@
 
 package org.apache.fluss.lake.committer;
 
-import org.apache.fluss.utils.types.Tuple2;
-
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
 /**
- * The lake already committed snapshot, containing the lake snapshot id and 
the bucket end offsets
- * in this snapshot.
+ * The lake already committed snapshot, containing the lake snapshot id and 
the properties stored in
+ * this snapshot.
  */
 public class CommittedLakeSnapshot {
 
     private final long lakeSnapshotId;
-    // <partition_id, bucket> -> log offset, partition_id will be null if it's 
not a
-    // partition bucket
-    private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new 
HashMap<>();
 
-    public CommittedLakeSnapshot(long lakeSnapshotId) {
+    private final Map<String, String> snapshotProperties;
+
+    public CommittedLakeSnapshot(long lakeSnapshotId, Map<String, String> 
snapshotProperties) {
         this.lakeSnapshotId = lakeSnapshotId;
+        this.snapshotProperties = snapshotProperties;
     }

Review Comment:
   Missing null validation: The snapshotProperties parameter is not validated 
for null before being stored. Since this property is essential for the class 
functionality (as evidenced by its usage in TieringCommitOperator), consider 
adding a null check or documenting that null is an acceptable value. The class 
javadoc should clarify whether null properties are allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to