Copilot commented on code in PR #2886:
URL: https://github.com/apache/fluss/pull/2886#discussion_r2969310463


##########
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java:
##########
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.aligned.AlignedRow;
+import org.apache.fluss.types.RowType;
+
+import java.util.Arrays;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Byte view implementation of LogRecordBatchStatistics that provides 
zero-copy access to statistics
+ * data without creating heap objects or copying data. Uses AlignedRow for 
better performance.
+ * Supports schema-aware format with partial column statistics.
+ *
+ * <h3>Statistics Layout Format</h3>
+ *
+ * <p>The binary format of statistics data stored in memory segment:
+ *
+ * <pre>
+ * 
+------------------+---------+-------------------------------------------------------+
+ * | Field            | Size    | Description                                  
         |
+ * 
+------------------+---------+-------------------------------------------------------+
+ * | Version          | 1 byte  | Statistics format version                    
         |
+ * | Column Count     | 2 bytes | Number of columns with statistics            
         |
+ * | Column Indexes   | 2*N     | Field indexes in original schema (2 bytes 
each)      |
+ * | Null Counts      | 4*N     | Null counts for each stats column (4 bytes 
each)     |
+ * | Min Values Size  | 4 bytes | Size of min values AlignedRow data           
        |
+ * | Min Values Data  | Variable| AlignedRow containing minimum values         
         |
+ * | Max Values Size  | 4 bytes | Size of max values AlignedRow data           
        |
+ * | Max Values Data  | Variable| AlignedRow containing maximum values         
         |
+ * 
+------------------+---------+-------------------------------------------------------+
+ * </pre>
+ *
+ * <p>The statistics support partial column statistics through 
statsIndexMapping, which maps
+ * statistics column positions to the original table schema field indexes. 
This allows efficient
+ * storage when only a subset of columns have statistics collected.
+ */
+public class DefaultLogRecordBatchStatistics implements 
LogRecordBatchStatistics {
+
+    private final MemorySegment segment;
+    private final int position;
+    private final int size;
+    private final RowType rowType;
+    private final int schemaId;
+
+    private final int[] statsIndexMapping;
+
+    private final Long[] statsNullCounts;
+
+    // Offsets for min/max values in the byte array
+    private final int minValuesOffset;
+    private final int maxValuesOffset;
+    private final int minValuesSize;
+    private final int maxValuesSize;
+
+    private InternalRow cachedMinRow;
+    private InternalRow cachedMaxRow;
+    private Long[] cachedNullCounts;
+
+    private final int[] reversedStatsIndexMapping;
+
+    /** Constructor for schema-aware statistics. */
+    public DefaultLogRecordBatchStatistics(
+            MemorySegment segment,
+            int position,
+            int size,
+            RowType rowType,
+            int schemaId,
+            Long[] nullCounts,
+            int minValuesOffset,
+            int maxValuesOffset,
+            int minValuesSize,
+            int maxValuesSize,
+            int[] statsIndexMapping) {
+        this.segment = segment;
+        this.position = position;
+        this.size = size;
+        this.rowType = rowType;
+        this.schemaId = schemaId;
+        this.statsNullCounts = nullCounts;
+        this.minValuesOffset = minValuesOffset;
+        this.maxValuesOffset = maxValuesOffset;
+        this.minValuesSize = minValuesSize;
+        this.maxValuesSize = maxValuesSize;
+        this.statsIndexMapping = statsIndexMapping;
+        this.reversedStatsIndexMapping = new int[rowType.getFieldCount()];
+        Arrays.fill(this.reversedStatsIndexMapping, -1);
+        for (int statsIndex = 0; statsIndex < statsIndexMapping.length; 
statsIndex++) {
+            this.reversedStatsIndexMapping[statsIndexMapping[statsIndex]] = 
statsIndex;
+        }
+    }
+
+    @Override
+    public InternalRow getMinValues() {
+        if (minValuesSize <= 0) {
+            return null;
+        }
+
+        // Return cached row if already created
+        if (cachedMinRow != null) {
+            return cachedMinRow;
+        }
+
+        AlignedRow minRow = new AlignedRow(rowType.getFieldCount());
+        minRow.pointTo(segment, position + minValuesOffset, minValuesSize);
+
+        this.cachedMinRow = new FullRowWrapper(minRow, 
reversedStatsIndexMapping);
+        return this.cachedMinRow;
+    }
+
+    @Override
+    public InternalRow getMaxValues() {
+        if (maxValuesSize <= 0) {
+            return null;
+        }
+
+        // Return cached row if already created
+        if (cachedMaxRow != null) {
+            return cachedMaxRow;
+        }
+
+        AlignedRow maxRow = new AlignedRow(rowType.getFieldCount());
+        maxRow.pointTo(segment, position + maxValuesOffset, maxValuesSize);
+
+        this.cachedMaxRow = new FullRowWrapper(maxRow, 
reversedStatsIndexMapping);
+        return this.cachedMaxRow;
+    }
+
+    @Override
+    public Long[] getNullCounts() {
+        if (cachedNullCounts != null) {
+            return cachedNullCounts;
+        }
+        cachedNullCounts = new Long[rowType.getFieldCount()];
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            if (this.reversedStatsIndexMapping[i] >= 0) {
+                cachedNullCounts[i] = 
statsNullCounts[reversedStatsIndexMapping[i]];
+            } else {
+                cachedNullCounts[i] = -1L;
+            }
+        }
+        return cachedNullCounts;
+    }
+
+    @Override
+    public boolean hasColumnStatistics(int fieldIndex) {
+        return reversedStatsIndexMapping[fieldIndex] != -1;
+    }
+
+    /**
+     * Get the null count for a specific field.
+     *
+     * @param fieldIndex The field index
+     * @return The null count for the field
+     */
+    public long getNullCount(int fieldIndex) {
+        return statsNullCounts[fieldIndex] != null ? 
statsNullCounts[fieldIndex].longValue() : 0L;

Review Comment:
   `getNullCount(int fieldIndex)` indexes directly into `statsNullCounts`, but 
`statsNullCounts` is sized by the number of *stats columns* 
(statsIndexMapping), not the full schema field count. This will return wrong 
results (or throw) when statistics are collected for a subset of columns. It 
should map `fieldIndex` through `reversedStatsIndexMapping` (and likely 
throw/return sentinel when a column has no stats).
   ```suggestion
        * @param fieldIndex The field index in the full row schema
        * @return The null count for the field, or -1L if no statistics are 
available
        */
       public long getNullCount(int fieldIndex) {
           checkArgument(
                   fieldIndex >= 0 && fieldIndex < rowType.getFieldCount(),
                   "fieldIndex out of range: " + fieldIndex);
   
           int statsIndex = reversedStatsIndexMapping[fieldIndex];
           if (statsIndex < 0) {
               // No statistics collected for this column; use the same 
sentinel as getNullCounts()
               return -1L;
           }
   
           Long count = statsNullCounts[statsIndex];
           return count != null ? count.longValue() : 0L;
   ```



##########
fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java:
##########
@@ -154,4 +157,42 @@ public AutoPartitionStrategy getAutoPartitionStrategy() {
     public long getAutoIncrementCacheSize() {
         return config.get(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE);
     }
+
+    /** Gets whether statistics collection is enabled for the table. */
+    public boolean isStatisticsEnabled() {
+        String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS);
+        return !columnsStr.isEmpty();
+    }
+
+    /**
+     * Gets the statistics columns configuration of the table.
+     *
+     * @return Optional containing the list of column names if specific 
columns are configured,
+     *     empty if all non-binary columns should be collected ("*" 
configuration), null if
+     *     statistics collection is disabled (empty string configuration)
+     */
+    public Optional<List<String>> getStatisticsColumns() {
+        String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS);
+        if (columnsStr.isEmpty()) {
+            return null; // null means statistics collection is disabled
+        }
+        if ("*".equals(columnsStr)) {
+            return Optional.empty(); // Empty means collect all non-binary 
columns
+        }

Review Comment:
   `getStatisticsColumns()` returns `null` when statistics are disabled, even 
though the return type is `Optional<List<String>>`. Returning null from an 
Optional-typed method defeats the purpose of Optional and is easy to misuse 
(NPE risk). Consider returning `Optional.empty()` for the "not configured" case 
and using `isStatisticsEnabled()` to distinguish disabled vs wildcard, or 
introduce a small tri-state type to model {disabled, all, subset}.
   ```suggestion
        * <p>This method only returns a non-empty {@link Optional} when 
statistics collection is
        * configured for a specific subset of columns. For other 
configurations, use
        * {@link #isStatisticsEnabled()} and {@link 
#isCollectAllNonBinaryColumns()} to distinguish
        * between disabled and wildcard ("*") collection.
        *
        * @return Optional containing the list of column names if specific 
columns are configured,
        *     or {@link Optional#empty()} if statistics are disabled (empty 
string configuration) or
        *     all non-binary columns should be collected ("*" configuration)
        */
       public Optional<List<String>> getStatisticsColumns() {
           String columnsStr = 
config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS);
           if (columnsStr.isEmpty() || "*".equals(columnsStr)) {
               // Empty Optional means either statistics are disabled or all 
non-binary columns
               // should be collected. Use isStatisticsEnabled() and 
isCollectAllNonBinaryColumns()
               // to distinguish these cases.
               return Optional.empty();
           }
   ```



##########
fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java:
##########
@@ -124,6 +139,82 @@ public int position() {
             return position;
         }
 
+        public BytesView getBytesView() {
+            return getBytesView(false);
+        }
+
+        /**
+         * Get the BytesView of this record batch.
+         *
+         * @param trimStatistics whether to trim statistics data from the batch
+         * @return the BytesView of this record batch, possibly without 
statistics
+         */
+        public BytesView getBytesView(boolean trimStatistics) {
+            if (!trimStatistics || magic < LOG_MAGIC_VALUE_V2) {
+                // No trimming needed, or statistics not supported in this 
version
+                return new FileRegionBytesView(fileRecords.channel(), 
position, sizeInBytes());
+            }
+
+            // Check if this batch has statistics
+            DefaultLogRecordBatch header = loadBatchHeader();
+            int statisticsLength = header.getStatisticsLength();
+
+            if (statisticsLength == 0) {
+                // No statistics present
+                return new FileRegionBytesView(fileRecords.channel(), 
position, sizeInBytes());
+            }
+
+            // Create a modified view that skips statistics data
+            return createTrimmedBytesView(statisticsLength);
+        }
+
+        /**
+         * Create a BytesView with statistics trimmed by modifying the header 
fields. In V2 format,
+         * statistics are placed between the header and records, so we skip 
the statistics portion.
+         *
+         * @param statisticsLength the length of statistics data to skip
+         * @return a BytesView with modified header and trimmed data
+         */
+        private BytesView createTrimmedBytesView(int statisticsLength) {
+            try {
+                int headerSize = recordBatchHeaderSize(magic);
+
+                // Calculate the new batch size without statistics
+                int newBatchSizeInBytes = sizeInBytes() - statisticsLength;
+                // Load the original header
+                byte[] modifiedHeaderBytes = new byte[headerSize];
+                cachedHeaderBuffer.rewind();
+                cachedHeaderBuffer.get(modifiedHeaderBytes);
+                ByteBuffer modifiedHeader = 
ByteBuffer.wrap(modifiedHeaderBytes);
+                modifiedHeader.order(ByteOrder.LITTLE_ENDIAN);
+
+                // Update the length field
+                modifiedHeader.position(LENGTH_OFFSET);
+                modifiedHeader.putInt(newBatchSizeInBytes - LOG_OVERHEAD);
+
+                // Clear statistics information from header
+                LogRecordBatchFormat.clearStatisticsFromHeader(modifiedHeader, 
magic);
+

Review Comment:
   `createTrimmedBytesView` updates the LENGTH field and clears the statistics 
flag/length, but it does not recompute the batch CRC. Since CRC is validated 
from `schemaIdOffset` to the end of the batch, trimming bytes will make 
`ensureValid()` fail whenever CRC checking is enabled. Either recompute and 
rewrite the CRC in the modified header, or clearly document/guarantee this path 
is only used when CRC checks are skipped.



##########
fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java:
##########
@@ -287,12 +417,73 @@ private void writeBatchHeader() throws IOException {
         outputView.writeInt(batchSequence);
         outputView.writeInt(recordCount);
 
-        // Update crc.
-        long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), 
schemaIdOffset(magic));
+        // For V2, write statistics length
+        if (magic >= LogRecordBatchFormat.LOG_MAGIC_VALUE_V2) {
+            outputView.writeInt(statsSize);
+        }
+
+        // Update crc - CRC covers from schemaId to end of the batch.
+        // Since statistics bytes are in a separate view (not in 
pagedOutputView segments),
+        // we need to compute CRC across all views in bytesView.
+        long crc = computeCrc();
         outputView.setPosition(crcOffset(magic));
         outputView.writeUnsignedInt(crc);
     }
 
+    private int statisticsBytesLength() {
+        return statisticsBytes != null ? statisticsBytes.length : 0;
+    }
+
+    /**
+     * Compute CRC over the entire batch content starting from schemaId 
offset. This handles the
+     * case where statistics bytes are in a separate view between header and 
records.
+     */
+    private long computeCrc() {
+        if (statisticsBytesLength() == 0) {
+            return Crc32C.compute(pagedOutputView.getWrittenSegments(), 
schemaIdOffset(magic));
+        }
+
+        // With statistics, we need to compute CRC across:
+        // [header from schemaId] + [statistics bytes] + [records from 
pagedOutputView after header]
+        int headerSize = recordBatchHeaderSize(magic);
+        int schemaIdOff = schemaIdOffset(magic);
+
+        // Collect all bytes for CRC computation
+        int totalCrcBytes = bytesView.getBytesLength() - schemaIdOff;
+        byte[] crcBuffer = new byte[totalCrcBytes];
+        int pos = 0;

Review Comment:
   `computeCrc()` allocates a `byte[]` of size `bytesView.getBytesLength() - 
schemaIdOff` and copies the entire batch into it before computing CRC. For 
large batches this is a significant allocation/GC cost and can cause OOM. 
Prefer computing the checksum incrementally (e.g., `Checksum crc = 
Crc32C.create()` + `Checksums.update(...)` over header slice, 
`statisticsBytes`, then the remaining MemorySegment segments) to avoid 
materializing the whole batch on heap.



##########
fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java:
##########
@@ -122,6 +126,122 @@ public void setCurrentProjection(
         this.selectedFieldPositions = selectedFieldPositions;
     }
 
+    public BytesView projectRecordBatch(FileChannelLogRecordBatch batch) 
throws IOException {
+        // Get schema ID from the batch to determine the projection
+        FileChannel channel = batch.fileRecords.channel();
+        int position = batch.position();
+
+        // Read the log header to get schema ID
+        logHeaderBuffer.rewind();
+        readLogHeaderFullyOrFail(channel, logHeaderBuffer, position);
+        logHeaderBuffer.rewind();
+        byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
+        logHeaderBuffer.rewind();

Review Comment:
   `projectRecordBatch` reads a full header into a V2-sized `logHeaderBuffer` 
via `readLogHeaderFullyOrFail(...)`, but that helper currently only validates 
short reads for V0/V1. With `magic == V2`, a truncated file/header can slip 
through and later reads may fail with less-informative errors. Consider 
extending `readLogHeaderFullyOrFail` to explicitly validate V2 header size when 
`magic == LOG_MAGIC_VALUE_V2`.
   ```suggestion
   
           // For V2 batches, ensure we have read the full V2 header; otherwise,
           // a truncated header could slip through and cause less-informative 
errors later.
           if (magic == LOG_MAGIC_VALUE_V2) {
               logHeaderBuffer.rewind();
               readFullyOrFail(channel, logHeaderBuffer, position, 
V2_RECORD_BATCH_HEADER_SIZE);
               logHeaderBuffer.rewind();
           }
   ```



##########
fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java:
##########
@@ -351,6 +357,293 @@ void testResetWriterState() throws Exception {
         assertThat(recordBatch.batchSequence()).isEqualTo(1);
     }
 
+    @Test
+    void testStatisticsWriteAndRead() throws Exception {
+        // Create test data with different data types to test statistics 
collection
+        List<Object[]> testData =
+                Arrays.asList(
+                        new Object[] {1, "a", 10.5},
+                        new Object[] {2, "b", 20.7},
+                        new Object[] {3, "c", 15.2},
+                        new Object[] {4, "d", 30.1},
+                        new Object[] {5, "e", 8.9});
+
+        // Create row type for test data
+        RowType testRowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField("id", DataTypes.INT()),
+                                new DataField("name", DataTypes.STRING()),
+                                new DataField("value", DataTypes.DOUBLE())));
+
+        // Create ArrowWriter and builder
+        ArrowWriter writer =
+                provider.getOrCreateWriter(
+                        1L, DEFAULT_SCHEMA_ID, 1024 * 10, testRowType, 
NO_COMPRESSION);
+        MemoryLogRecordsArrowBuilder builder =
+                createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10, 
LOG_MAGIC_VALUE_V2);
+
+        // Append test data
+        List<ChangeType> changeTypes =
+                testData.stream().map(row -> 
ChangeType.APPEND_ONLY).collect(Collectors.toList());
+        List<InternalRow> rows =
+                
testData.stream().map(DataTestUtils::row).collect(Collectors.toList());
+
+        for (int i = 0; i < testData.size(); i++) {
+            builder.append(changeTypes.get(i), rows.get(i));
+        }
+
+        // Set writer state and close
+        builder.setWriterState(1L, 0);
+        builder.close();
+
+        // Build and create MemoryLogRecords
+        MemoryLogRecords records = 
MemoryLogRecords.pointToBytesView(builder.build());
+
+        // Verify basic properties
+        assertThat(records.sizeInBytes()).isGreaterThan(0);
+        Iterator<LogRecordBatch> iterator = records.batches().iterator();
+        assertThat(iterator.hasNext()).isTrue();
+        LogRecordBatch batch = iterator.next();
+        assertThat(iterator.hasNext()).isFalse();
+
+        // Verify batch properties
+        assertThat(batch.getRecordCount()).isEqualTo(testData.size());
+        assertThat(batch.baseLogOffset()).isEqualTo(0);
+        assertThat(batch.lastLogOffset()).isEqualTo(testData.size() - 1);
+        assertThat(batch.nextLogOffset()).isEqualTo(testData.size());
+        assertThat(batch.writerId()).isEqualTo(1L);
+        assertThat(batch.batchSequence()).isEqualTo(0);
+        
assertThat(batch.magic()).isEqualTo(LogRecordBatchFormat.LOG_MAGIC_VALUE_V2);
+
+        // Create read context
+        LogRecordReadContext readContext =
+                LogRecordReadContext.createArrowReadContext(
+                        testRowType, DEFAULT_SCHEMA_ID, TEST_SCHEMA_GETTER);
+
+        // Test statistics reading
+        Optional<LogRecordBatchStatistics> statisticsOpt = 
batch.getStatistics(readContext);
+        assertThat(statisticsOpt).isPresent();
+
+        LogRecordBatchStatistics statistics = statisticsOpt.get();
+        assertThat(statistics.getMinValues()).isNotNull();
+        assertThat(statistics.getMaxValues()).isNotNull();
+        assertThat(statistics.getNullCounts()).isNotNull();
+
+        // Verify statistics for each field
+        // Field 0: id (INT)
+        assertThat(statistics.getMinValues().getInt(0)).isEqualTo(1); // min id
+        assertThat(statistics.getMaxValues().getInt(0)).isEqualTo(5); // max id
+        assertThat(statistics.getNullCounts()[0]).isEqualTo(0); // no nulls
+
+        // Field 1: name (STRING) - string statistics are not collected in 
current implementation

Review Comment:
   The test comment says string statistics are not collected, but 
`LogRecordBatchStatisticsCollector` does collect STRING min/max (see 
collector's STRING case). This comment is misleading; either remove it or add 
assertions for the STRING field’s min/max to reflect the current behavior.
   ```suggestion
           // Field 1: name (STRING)
   ```



##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1579,6 +1579,18 @@ public class ConfigOptions {
                                     + "This mode reduces storage and 
transmission costs but loses the ability to track previous values. "
                                     + "This option only affects primary key 
tables.");
 
+    public static final ConfigOption<String> TABLE_STATISTICS_COLUMNS =
+            key("table.statistics.columns")
+                    .stringType()
+                    .defaultValue("*")
+                    .withDescription(
+                            "Configures statistics collection for the table. "
+                                    + "Empty string ('') (default) means 
disable statistics collection completely. "
+                                    + "The value '*' means collect statistics 
for all non-binary columns. "

Review Comment:
   `TABLE_STATISTICS_COLUMNS` sets `defaultValue("*")`, but the description 
says "Empty string ('') (default) means disable". Either the default should be 
"" or the description should be updated; otherwise users will be misled and 
stats collection will be enabled by default in practice.
   ```suggestion
                                       + "Empty string ('') means disable 
statistics collection completely. "
                                       + "The value '*' (default) means collect 
statistics for all non-binary columns. "
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to