luoyuxia commented on code in PR #1451:
URL: https://github.com/apache/fluss/pull/1451#discussion_r2253363061


##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -196,15 +196,36 @@ private Committable commitWriteResults(
             // to committable
             Committable committable = 
lakeCommitter.toCommittable(writeResults);
             // before commit to lake, check fluss not missing any lake 
snapshot commited by fluss
+
+            TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+            Map<Long, String> partitionNameById = null;
+            Map<String, Long> partitionIdByName = null;
+            if (tableInfo.isPartitioned()) {
+                partitionNameById =
+                        admin.listPartitionInfos(tablePath).get().stream()
+                                .collect(
+                                        Collectors.toMap(
+                                                PartitionInfo::getPartitionId,
+                                                
PartitionInfo::getPartitionName));
+
+                partitionIdByName =
+                        partitionNameById.entrySet().stream()
+                                .collect(Collectors.toMap(Map.Entry::getValue, 
Map.Entry::getKey));
+            }
+
             checkFlussNotMissingLakeSnapshot(
                     tablePath,
                     lakeCommitter,
                     committable,
                     flussCurrentLakeSnapshot == null
                             ? null
-                            : flussCurrentLakeSnapshot.getSnapshotId());
+                            : flussCurrentLakeSnapshot.getSnapshotId(),
+                    partitionIdByName,

Review Comment:
   `partitionIdByName` is not used currently...
   If paimon alreay store the partition name in property, we can still get the 
partition name from the property instead of by the above code:
   ```
   TableInfo tableInfo = admin.getTableInfo(tablePath).get();
               Map<Long, String> partitionNameById = null;
               Map<String, Long> partitionIdByName = null;
               if (tableInfo.isPartitioned()) {
                   partitionNameById =
                           admin.listPartitionInfos(tablePath).get().stream()
                                   .collect(
                                           Collectors.toMap(
                                                   
PartitionInfo::getPartitionId,
                                                   
PartitionInfo::getPartitionName));
   
                   partitionIdByName =
                           partitionNameById.entrySet().stream()
                                   
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to