Copilot commented on code in PR #2454:
URL: https://github.com/apache/fluss/pull/2454#discussion_r2895425653


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -578,17 +582,28 @@ static LakeTieringHeartbeatRequest tieringTableHeartBeat(
                         toPbHeartbeatReqForTable(tieringTableEpochs, 
coordinatorEpoch));
             }
             if (!finishedTables.isEmpty()) {
-                Map<Long, Long> finishTieringEpochs = new HashMap<>();
                 Set<Long> forceFinishedTables = new HashSet<>();
+                List<PbHeartbeatReqForTable> finishedTableReqs = new 
ArrayList<>();
                 finishedTables.forEach(
                         (tableId, tieringFinishInfo) -> {
-                            finishTieringEpochs.put(tableId, 
tieringFinishInfo.tieringEpoch);
                             if (tieringFinishInfo.isForceFinished) {
                                 forceFinishedTables.add(tableId);
                             }
+                            finishedTableReqs.add(
+                                    new PbHeartbeatReqForTable()
+                                            .setTableId(tableId)
+                                            
.setCoordinatorEpoch(coordinatorEpoch)
+                                            
.setTieringEpoch(tieringFinishInfo.tieringEpoch)
+                                            .setLakeTieringStats(
+                                                    new PbLakeTieringStats()
+                                                            .setFileSize(
+                                                                    
tieringFinishInfo.stats
+                                                                            
.getFileSize())
+                                                            .setRecordCount(
+                                                                    
tieringFinishInfo.stats
+                                                                            
.getRecordCount())));

Review Comment:
   `TieringFinishInfo.stats` can be null (e.g., if a caller constructs 
`FinishedTieringEvent` with a null stats value), but this code unconditionally 
dereferences it when building `PbLakeTieringStats`, which can crash the 
enumerator during heartbeat building. Default null stats to 
`TieringStats.UNKNOWN` (either when constructing `TieringFinishInfo` / 
`FinishedTieringEvent`, or guard here before calling getters).



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java:
##########
@@ -141,7 +153,26 @@ public LakeCommitResult commit(
                     snapshotId = rewriteCommitSnapshotId;
                 }
             }
-            return LakeCommitResult.committedIsReadable(snapshotId);
+
+            // collect cumulative table stats from the committed snapshot's 
summary.
+            // Use the specific committed snapshotId rather than 
currentSnapshot() to avoid
+            // reading a different snapshot in case of concurrent commits.
+            long totalFileSize = -1L;
+            long totalRecordCount = -1L;
+            Snapshot committedSnapshot = icebergTable.snapshot(snapshotId);
+            if (committedSnapshot != null && committedSnapshot.summary() != 
null) {
+                Map<String, String> summary = committedSnapshot.summary();
+                String sizeStr = summary.get(SNAPSHOT_TOTAL_FILE_SIZE);
+                String countStr = summary.get(SNAPSHOT_TOTAL_RECORDS);
+                if (sizeStr != null) {
+                    totalFileSize = Long.parseLong(sizeStr);
+                }
+                if (countStr != null) {
+                    totalRecordCount = Long.parseLong(countStr);

Review Comment:
   Parsing Iceberg snapshot summary values with `Long.parseLong` can throw 
`NumberFormatException` if the summary is missing, corrupted, or uses a 
non-numeric format, which would fail the whole commit. Treat stats collection 
as best-effort here as well: catch parse errors (and optionally log a warning) 
and keep the values as `-1` instead of failing the commit.
   ```suggestion
                       try {
                           totalFileSize = Long.parseLong(sizeStr);
                       } catch (NumberFormatException nfe) {
                           LOG.warn(
                                   "Failed to parse snapshot total file size 
'{}' for snapshot {}. Using default -1.",
                                   sizeStr,
                                   snapshotId,
                                   nfe);
                       }
                   }
                   if (countStr != null) {
                       try {
                           totalRecordCount = Long.parseLong(countStr);
                       } catch (NumberFormatException nfe) {
                           LOG.warn(
                                   "Failed to parse snapshot total record count 
'{}' for snapshot {}. Using default -1.",
                                   countStr,
                                   snapshotId,
                                   nfe);
                       }
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -165,10 +188,23 @@ protected LakeTableTieringManager(
         this.tableTierEpoch = new HashMap<>();
         this.tableLastTieredTime = new HashMap<>();
         this.delayedTieringByTableId = new HashMap<>();
+        this.tableLastTieringDuration = new HashMap<>();
+        this.tableTieringFailureCount = new HashMap<>();
+        this.tableLastLakeFileSize = new HashMap<>();
+        this.tableLastLakeRecordCount = new HashMap<>();
+        this.tieringMetricGroup = lakeTieringMetricGroup;
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        tieringMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, 
pendingTieringTables::size);
+        tieringMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, 
liveTieringTableIds::size);

Review Comment:
   Docs list a coordinator `lakeTiering.failuresTotal` metric, and the manager 
maintains `globalTieringFailureCount`, but no metric is registered for it (and 
there is no corresponding `MetricNames` constant). Register a gauge/counter for 
the global failure total here and add the metric name constant so the 
documented metric is actually exposed.
   ```suggestion
                   MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, 
liveTieringTableIds::size);
           tieringMetricGroup.gauge(
                   MetricNames.LAKE_TIERING_FAILURES_TOTAL, () -> 
globalTieringFailureCount);
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -647,17 +662,26 @@ private static class TieringFinishInfo {
          */
         boolean isForceFinished;
 
+        /** Stats collected during this tiering round. */
+        TieringStats stats;
+
         public static TieringFinishInfo from(long tieringEpoch) {
-            return new TieringFinishInfo(tieringEpoch, false);
+            return new TieringFinishInfo(tieringEpoch, false, 
TieringStats.UNKNOWN);
         }
 
         public static TieringFinishInfo from(long tieringEpoch, boolean 
isForceFinished) {
-            return new TieringFinishInfo(tieringEpoch, isForceFinished);
+            return new TieringFinishInfo(tieringEpoch, isForceFinished, 
TieringStats.UNKNOWN);
+        }
+
+        public static TieringFinishInfo from(
+                long tieringEpoch, boolean isForceFinished, TieringStats 
stats) {
+            return new TieringFinishInfo(tieringEpoch, isForceFinished, stats);
         }
 
-        private TieringFinishInfo(long tieringEpoch, boolean isForceFinished) {
+        private TieringFinishInfo(long tieringEpoch, boolean isForceFinished, 
TieringStats stats) {
             this.tieringEpoch = tieringEpoch;
             this.isForceFinished = isForceFinished;
+            this.stats = stats;

Review Comment:
   The `TieringFinishInfo` factory/constructor accepts a potentially null 
`TieringStats` and stores it directly. Since downstream code assumes `stats` is 
non-null (to build protobuf messages), enforce non-null here (e.g., replace 
null with `TieringStats.UNKNOWN` or validate with a null-check).
   ```suggestion
               this.stats = (stats == null) ? TieringStats.UNKNOWN : stats;
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java:
##########
@@ -26,11 +26,23 @@ public class FinishedTieringEvent implements SourceEvent {
 
     private final long tableId;
 
-    public FinishedTieringEvent(long tableId) {
+    /** Statistics collected during this tiering round. */
+    private final TieringStats stats;
+
+    public FinishedTieringEvent(long tableId, TieringStats stats) {
         this.tableId = tableId;
+        this.stats = stats;

Review Comment:
   `FinishedTieringEvent` stores the provided `stats` reference as-is. If any 
caller passes null, it will propagate and can cause NPEs when building 
heartbeats. Consider normalizing null to `TieringStats.UNKNOWN` (or rejecting 
null) in this constructor to keep the event contract safe.
   ```suggestion
           this.stats = stats != null ? stats : TieringStats.UNKNOWN;
   ```



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java:
##########
@@ -153,6 +183,40 @@ public LakeCommitResult commit(
         }
     }
 
+    /**
+     * Computes cumulative table stats from the snapshot's base manifest list.
+     *
+     * <p>The base manifest list covers all live data files in the snapshot 
(main branch only,
+     * expired snapshots excluded). ADD entries increase the total; DELETE 
entries (produced by
+     * compaction) decrease it, yielding the net live file size and physical 
row count.
+     *
+     * <p>For primary-key tables the physical row count may include 
un-compacted delete rows at L0
+     * before a full compaction is completed.
+     *
+     * @return {@code long[]{totalFileSize, totalRowCount}}
+     */
+    @VisibleForTesting
+    static long[] computeTableStats(FileStore<?> store, Snapshot snapshot) {
+        ManifestList manifestList = store.manifestListFactory().create();
+        ManifestFile manifestFile = store.manifestFileFactory().create();
+        List<ManifestFileMeta> manifestFileMetas = 
manifestList.readDataManifests(snapshot);
+        long totalFileSize = 0L;
+        long totalRowCount = 0L;
+        for (ManifestFileMeta manifestFileMeta : manifestFileMetas) {
+            List<ManifestEntry> manifestEntries = 
manifestFile.read(manifestFileMeta.fileName());
+            for (ManifestEntry entry : manifestEntries) {
+                if (entry.kind() == FileKind.ADD) {
+                    totalFileSize += entry.file().fileSize();
+                    totalRowCount += entry.file().rowCount();
+                } else {
+                    totalFileSize -= entry.file().fileSize();
+                    totalRowCount -= entry.file().rowCount();
+                }
+            }
+        }
+        return new long[] {totalFileSize, totalRowCount};

Review Comment:
   `computeTableStats` reads every data manifest and every manifest entry for 
the committed snapshot on each commit. For large/active tables this can add 
significant overhead to the tiering commit path and increase end-to-end 
latency. Consider guarding stats collection behind a config flag, sampling, 
caching, or using a cheaper built-in snapshot/statistics API if available, 
while keeping the existing best-effort behavior.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -165,10 +188,23 @@ protected LakeTableTieringManager(
         this.tableTierEpoch = new HashMap<>();
         this.tableLastTieredTime = new HashMap<>();
         this.delayedTieringByTableId = new HashMap<>();
+        this.tableLastTieringDuration = new HashMap<>();
+        this.tableTieringFailureCount = new HashMap<>();
+        this.tableLastLakeFileSize = new HashMap<>();
+        this.tableLastLakeRecordCount = new HashMap<>();
+        this.tieringMetricGroup = lakeTieringMetricGroup;
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        tieringMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, 
pendingTieringTables::size);
+        tieringMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, 
liveTieringTableIds::size);

Review Comment:
   The global lakeTiering gauges read `pendingTieringTables::size` (ArrayDeque) 
and `liveTieringTableIds::size` (HashMap) without holding the manager lock. 
Metric reporters may call these suppliers concurrently with state transitions, 
which can lead to racy/inconsistent values and potential concurrency issues. 
Wrap the suppliers in `inReadLock(lock, ...)` (or switch these structures to 
concurrent + maintain atomic counts) so metrics collection is thread-safe.
   ```suggestion
                   MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT,
                   () -> inReadLock(lock, () -> pendingTieringTables.size()));
           tieringMetricGroup.gauge(
                   MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT,
                   () -> inReadLock(lock, () -> liveTieringTableIds.size()));
   ```



##########
website/docs/maintenance/observability/monitor-metrics.md:
##########
@@ -384,6 +384,48 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>All kv snapshot size of each table bucket.</td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td rowspan="3">lakeTiering</td>
+      <td>pendingTablesCount</td>
+      <td>The number of tables waiting to be tiered.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>runningTablesCount</td>
+      <td>The number of tables currently being tiered.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>failuresTotal</td>
+      <td>The total number of tiering failures across all tables.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td rowspan="5">lakeTiering_table</td>
+      <td>tierLag</td>
+      <td>Time in milliseconds since the last successful tiering operation for 
this table. Returns -1 for newly created tables that have never been 
tiered.</td>
+      <td>Gauge</td>

Review Comment:
   This doc section lists `lakeTiering.failuresTotal` and states `tierLag` 
returns `-1` for newly created tables. In the current implementation, a global 
`failuresTotal` metric is not registered, and `tierLag` is computed from 
`tableLastTieredTime` which is initialized on table registration (so new tables 
will report ~0 rather than `-1`). Either implement the documented 
metrics/semantics or adjust the documentation to match the actual exported 
values.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -467,9 +567,17 @@ private void doHandleStateChange(long tableId, 
TieringState targetState) {
                 break;
             case Tiered:
                 tableLastTieredTime.put(tableId, clock.milliseconds());
-                liveTieringTableIds.remove(tableId);
+                // calculate and record tiering duration
+                Long startTime = liveTieringTableIds.remove(tableId);
+                if (startTime != null) {
+                    long duration = clock.milliseconds() - startTime;
+                    tableLastTieringDuration.put(tableId, duration);
+                }

Review Comment:
   `liveTieringTableIds` is documented/used as "last heartbeat time" (and is 
updated in `renewTieringHeartbeat`), but the Tiered-state transition treats the 
removed value as a tiering *start* time to compute `tierDuration`. In real runs 
with heartbeats, this will under-report duration (it will measure time since 
last heartbeat, not total tiering time). Track start time separately (or stop 
overwriting the start timestamp on heartbeat renewals) and keep heartbeat 
tracking in a different field.
   ```suggestion
                   // clean up heartbeat tracking for this table; duration must 
not use heartbeat time
                   liveTieringTableIds.remove(tableId);
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -224,13 +267,58 @@ private void scheduleTableTiering(long tableId) {
         lakeTieringScheduleTimer.add(delayedTiering);
     }
 
+    private void registerTableMetrics(long tableId, TablePath tablePath) {
+        // create table-level metric group
+        MetricGroup tableMetricGroup =
+                tieringMetricGroup.addTableLakeTieringMetricGroup(tableId, 
tablePath);
+
+        // tierLag: milliseconds since last successful tiering
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_TIER_LAG,
+                () ->
+                        inReadLock(
+                                lock,
+                                () -> {
+                                    Long lastTiered = 
tableLastTieredTime.get(tableId);
+                                    return lastTiered != null
+                                            ? clock.milliseconds() - lastTiered
+                                            : -1L;
+                                }));
+
+        // tierDuration: duration of last tiering job
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_TIER_DURATION,
+                () -> inReadLock(lock, () -> 
tableLastTieringDuration.getOrDefault(tableId, 0L)));

Review Comment:
   `tierDuration` is initialized to `-1L` for unknown, but the gauge uses 
`getOrDefault(tableId, 0L)`, which can surface an incorrect `0` during races 
(e.g., if the table is removed before the metric group is fully deregistered). 
Use `-1L` as the default here to keep the "unknown" sentinel consistent.
   ```suggestion
                   () -> inReadLock(lock, () -> 
tableLastTieringDuration.getOrDefault(tableId, -1L)));
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:
##########
@@ -224,13 +267,58 @@ private void scheduleTableTiering(long tableId) {
         lakeTieringScheduleTimer.add(delayedTiering);
     }
 
+    private void registerTableMetrics(long tableId, TablePath tablePath) {
+        // create table-level metric group
+        MetricGroup tableMetricGroup =
+                tieringMetricGroup.addTableLakeTieringMetricGroup(tableId, 
tablePath);
+
+        // tierLag: milliseconds since last successful tiering
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_TIER_LAG,
+                () ->
+                        inReadLock(
+                                lock,
+                                () -> {
+                                    Long lastTiered = 
tableLastTieredTime.get(tableId);
+                                    return lastTiered != null
+                                            ? clock.milliseconds() - lastTiered
+                                            : -1L;
+                                }));

Review Comment:
   The `tierLag` gauge is described as "since last successful tiering" and the 
docs say it returns `-1` for tables that have never been tiered, but the 
implementation uses `tableLastTieredTime` which is initialized when the table 
is registered. That means newly created tables will report a small/zero lag 
instead of `-1`. Consider tracking "last successful tiering time" separately 
(initialized to -1) or adjust the metric/doc semantics to match what is stored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to