luoyuxia commented on code in PR #2933:
URL: https://github.com/apache/fluss/pull/2933#discussion_r3013796528


##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java:
##########
@@ -135,6 +136,11 @@ public Map<TableBucket, List<ScanRecord>> 
collectFetch(final LogFetchBuffer logF
 
                         recordsRemaining -= records.size();
                     }
+
+                    // Only count bytes when the fetch is fully consumed
+                    if (nextInLineFetch.isConsumed()) {

Review Comment:
   IIUC, totalBytesRead here is not an accurate per-ScanRecords measurement. 
Some ScanRecords may return records with totalBytesRead == 0, while others may 
accumulate the full size of a completed fetch and look over-counted relative to 
the records returned in that poll.
   
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -502,10 +514,15 @@ private TableBucketWriteResultWithSplitIds 
forSnapshotSplitRecords(
         LakeWriter<WriteResult> lakeWriter =
                 getOrCreateLakeWriter(
                         bucket, 
checkNotNull(currentSnapshotSplit).getPartitionName());
+        long bytesRead = 0;
         while (recordIterator.hasNext()) {
             ScanRecord scanRecord = recordIterator.next().record();
             lakeWriter.write(scanRecord);
+            InternalRow row = scanRecord.getRow();
+            // Snapshot path always produces BinaryRow 
(CompactedRow/IndexedRow).

Review Comment:
   TieringSplitReader’s snapshot-path byte accounting will throw 
ClassCastException as soon as snapshot rows are projected or schema-remapped. 
   
   SnapshotFilesReader.next() can legitimately return a ProjectedRow, but 
TieringSplitReader.forSnapshotSplitRecords() unconditionally casts every 
InternalRow to BinaryRow. That makes the new metric path unsafe for 
evolved-schema snapshots or any snapshot scan with projectedFields.
   
   ```
   SnapshotFilesReader
   BinaryValue originValue = valueDecoder.decodeValue(value);
   InternalRow originRow = originValue.row;
   if (targetSchemaId != originValue.schemaId) {
       int[] indexMapping =
               schemaProjectionCache.computeIfAbsent(
                       originValue.schemaId,
                       sourceSchemaId ->
                               SchemaUtil.getIndexMapping(
                                       schemaGetter.getSchema(sourceSchemaId), 
targetSchema));
       originRow = ProjectedRow.from(indexMapping).replaceRow(originRow);
   }
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java:
##########
@@ -37,12 +37,27 @@
  */
 @PublicEvolving
 public class ScanRecords implements Iterable<ScanRecord> {
-    public static final ScanRecords EMPTY = new 
ScanRecords(Collections.emptyMap());
+    public static final ScanRecords EMPTY = new 
ScanRecords(Collections.emptyMap(), 0);
 
     private final Map<TableBucket, List<ScanRecord>> records;
+    private final long totalBytesRead;
 
     public ScanRecords(Map<TableBucket, List<ScanRecord>> records) {
+        this(records, 0);
+    }
+
+    public ScanRecords(Map<TableBucket, List<ScanRecord>> records, long 
totalBytesRead) {
         this.records = records;
+        this.totalBytesRead = totalBytesRead;
+    }
+
+    /**
+     * Get the total bytes read from the Fluss log in this batch.
+     *
+     * @return the total bytes read
+     */
+    public long getTotalBytesRead() {

Review Comment:
   ```suggestion
       public long getTotalBytes() {
   ```
   nit: I think we can remove `read` suffix since `ScanRecords` already means 
read/scan



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to