Copilot commented on code in PR #2886: URL: https://github.com/apache/fluss/pull/2886#discussion_r2969310463
########## fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java: ########## @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalMap; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.aligned.AlignedRow; +import org.apache.fluss.types.RowType; + +import java.util.Arrays; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * Byte view implementation of LogRecordBatchStatistics that provides zero-copy access to statistics + * data without creating heap objects or copying data. Uses AlignedRow for better performance. + * Supports schema-aware format with partial column statistics. + * + * <h3>Statistics Layout Format</h3> + * + * <p>The binary format of statistics data stored in memory segment: + * + * <pre> + * +------------------+---------+-------------------------------------------------------+ + * | Field | Size | Description | + * +------------------+---------+-------------------------------------------------------+ + * | Version | 1 byte | Statistics format version | + * | Column Count | 2 bytes | Number of columns with statistics | + * | Column Indexes | 2*N | Field indexes in original schema (2 bytes each) | + * | Null Counts | 4*N | Null counts for each stats column (4 bytes each) | + * | Min Values Size | 4 bytes | Size of min values AlignedRow data | + * | Min Values Data | Variable| AlignedRow containing minimum values | + * | Max Values Size | 4 bytes | Size of max values AlignedRow data | + * | Max Values Data | Variable| AlignedRow containing maximum values | + * +------------------+---------+-------------------------------------------------------+ + * </pre> + * + * <p>The statistics support partial column statistics through statsIndexMapping, which maps + * statistics column positions to the original table schema field indexes. This allows efficient + * storage when only a subset of columns have statistics collected. + */ +public class DefaultLogRecordBatchStatistics implements LogRecordBatchStatistics { + + private final MemorySegment segment; + private final int position; + private final int size; + private final RowType rowType; + private final int schemaId; + + private final int[] statsIndexMapping; + + private final Long[] statsNullCounts; + + // Offsets for min/max values in the byte array + private final int minValuesOffset; + private final int maxValuesOffset; + private final int minValuesSize; + private final int maxValuesSize; + + private InternalRow cachedMinRow; + private InternalRow cachedMaxRow; + private Long[] cachedNullCounts; + + private final int[] reversedStatsIndexMapping; + + /** Constructor for schema-aware statistics. */ + public DefaultLogRecordBatchStatistics( + MemorySegment segment, + int position, + int size, + RowType rowType, + int schemaId, + Long[] nullCounts, + int minValuesOffset, + int maxValuesOffset, + int minValuesSize, + int maxValuesSize, + int[] statsIndexMapping) { + this.segment = segment; + this.position = position; + this.size = size; + this.rowType = rowType; + this.schemaId = schemaId; + this.statsNullCounts = nullCounts; + this.minValuesOffset = minValuesOffset; + this.maxValuesOffset = maxValuesOffset; + this.minValuesSize = minValuesSize; + this.maxValuesSize = maxValuesSize; + this.statsIndexMapping = statsIndexMapping; + this.reversedStatsIndexMapping = new int[rowType.getFieldCount()]; + Arrays.fill(this.reversedStatsIndexMapping, -1); + for (int statsIndex = 0; statsIndex < statsIndexMapping.length; statsIndex++) { + this.reversedStatsIndexMapping[statsIndexMapping[statsIndex]] = statsIndex; + } + } + + @Override + public InternalRow getMinValues() { + if (minValuesSize <= 0) { + return null; + } + + // Return cached row if already created + if (cachedMinRow != null) { + return cachedMinRow; + } + + AlignedRow minRow = new AlignedRow(rowType.getFieldCount()); + minRow.pointTo(segment, position + minValuesOffset, minValuesSize); + + this.cachedMinRow = new FullRowWrapper(minRow, reversedStatsIndexMapping); + return this.cachedMinRow; + } + + @Override + public InternalRow getMaxValues() { + if (maxValuesSize <= 0) { + return null; + } + + // Return cached row if already created + if (cachedMaxRow != null) { + return cachedMaxRow; + } + + AlignedRow maxRow = new AlignedRow(rowType.getFieldCount()); + maxRow.pointTo(segment, position + maxValuesOffset, maxValuesSize); + + this.cachedMaxRow = new FullRowWrapper(maxRow, reversedStatsIndexMapping); + return this.cachedMaxRow; + } + + @Override + public Long[] getNullCounts() { + if (cachedNullCounts != null) { + return cachedNullCounts; + } + cachedNullCounts = new Long[rowType.getFieldCount()]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + if (this.reversedStatsIndexMapping[i] >= 0) { + cachedNullCounts[i] = statsNullCounts[reversedStatsIndexMapping[i]]; + } else { + cachedNullCounts[i] = -1L; + } + } + return cachedNullCounts; + } + + @Override + public boolean hasColumnStatistics(int fieldIndex) { + return reversedStatsIndexMapping[fieldIndex] != -1; + } + + /** + * Get the null count for a specific field. + * + * @param fieldIndex The field index + * @return The null count for the field + */ + public long getNullCount(int fieldIndex) { + return statsNullCounts[fieldIndex] != null ? statsNullCounts[fieldIndex].longValue() : 0L; Review Comment: `getNullCount(int fieldIndex)` indexes directly into `statsNullCounts`, but `statsNullCounts` is sized by the number of *stats columns* (statsIndexMapping), not the full schema field count. This will return wrong results (or throw) when statistics are collected for a subset of columns. It should map `fieldIndex` through `reversedStatsIndexMapping` (and likely throw/return sentinel when a column has no stats). ```suggestion * @param fieldIndex The field index in the full row schema * @return The null count for the field, or -1L if no statistics are available */ public long getNullCount(int fieldIndex) { checkArgument( fieldIndex >= 0 && fieldIndex < rowType.getFieldCount(), "fieldIndex out of range: " + fieldIndex); int statsIndex = reversedStatsIndexMapping[fieldIndex]; if (statsIndex < 0) { // No statistics collected for this column; use the same sentinel as getNullCounts() return -1L; } Long count = statsNullCounts[statsIndex]; return count != null ? count.longValue() : 0L; ``` ########## fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java: ########## @@ -154,4 +157,42 @@ public AutoPartitionStrategy getAutoPartitionStrategy() { public long getAutoIncrementCacheSize() { return config.get(ConfigOptions.TABLE_AUTO_INCREMENT_CACHE_SIZE); } + + /** Gets whether statistics collection is enabled for the table. */ + public boolean isStatisticsEnabled() { + String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS); + return !columnsStr.isEmpty(); + } + + /** + * Gets the statistics columns configuration of the table. + * + * @return Optional containing the list of column names if specific columns are configured, + * empty if all non-binary columns should be collected ("*" configuration), null if + * statistics collection is disabled (empty string configuration) + */ + public Optional<List<String>> getStatisticsColumns() { + String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS); + if (columnsStr.isEmpty()) { + return null; // null means statistics collection is disabled + } + if ("*".equals(columnsStr)) { + return Optional.empty(); // Empty means collect all non-binary columns + } Review Comment: `getStatisticsColumns()` returns `null` when statistics are disabled, even though the return type is `Optional<List<String>>`. Returning null from an Optional-typed method defeats the purpose of Optional and is easy to misuse (NPE risk). Consider returning `Optional.empty()` for the "not configured" case and using `isStatisticsEnabled()` to distinguish disabled vs wildcard, or introduce a small tri-state type to model {disabled, all, subset}. ```suggestion * <p>This method only returns a non-empty {@link Optional} when statistics collection is * configured for a specific subset of columns. For other configurations, use * {@link #isStatisticsEnabled()} and {@link #isCollectAllNonBinaryColumns()} to distinguish * between disabled and wildcard ("*") collection. * * @return Optional containing the list of column names if specific columns are configured, * or {@link Optional#empty()} if statistics are disabled (empty string configuration) or * all non-binary columns should be collected ("*" configuration) */ public Optional<List<String>> getStatisticsColumns() { String columnsStr = config.get(ConfigOptions.TABLE_STATISTICS_COLUMNS); if (columnsStr.isEmpty() || "*".equals(columnsStr)) { // Empty Optional means either statistics are disabled or all non-binary columns // should be collected. Use isStatisticsEnabled() and isCollectAllNonBinaryColumns() // to distinguish these cases. return Optional.empty(); } ``` ########## fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java: ########## @@ -124,6 +139,82 @@ public int position() { return position; } + public BytesView getBytesView() { + return getBytesView(false); + } + + /** + * Get the BytesView of this record batch. + * + * @param trimStatistics whether to trim statistics data from the batch + * @return the BytesView of this record batch, possibly without statistics + */ + public BytesView getBytesView(boolean trimStatistics) { + if (!trimStatistics || magic < LOG_MAGIC_VALUE_V2) { + // No trimming needed, or statistics not supported in this version + return new FileRegionBytesView(fileRecords.channel(), position, sizeInBytes()); + } + + // Check if this batch has statistics + DefaultLogRecordBatch header = loadBatchHeader(); + int statisticsLength = header.getStatisticsLength(); + + if (statisticsLength == 0) { + // No statistics present + return new FileRegionBytesView(fileRecords.channel(), position, sizeInBytes()); + } + + // Create a modified view that skips statistics data + return createTrimmedBytesView(statisticsLength); + } + + /** + * Create a BytesView with statistics trimmed by modifying the header fields. In V2 format, + * statistics are placed between the header and records, so we skip the statistics portion. + * + * @param statisticsLength the length of statistics data to skip + * @return a BytesView with modified header and trimmed data + */ + private BytesView createTrimmedBytesView(int statisticsLength) { + try { + int headerSize = recordBatchHeaderSize(magic); + + // Calculate the new batch size without statistics + int newBatchSizeInBytes = sizeInBytes() - statisticsLength; + // Load the original header + byte[] modifiedHeaderBytes = new byte[headerSize]; + cachedHeaderBuffer.rewind(); + cachedHeaderBuffer.get(modifiedHeaderBytes); + ByteBuffer modifiedHeader = ByteBuffer.wrap(modifiedHeaderBytes); + modifiedHeader.order(ByteOrder.LITTLE_ENDIAN); + + // Update the length field + modifiedHeader.position(LENGTH_OFFSET); + modifiedHeader.putInt(newBatchSizeInBytes - LOG_OVERHEAD); + + // Clear statistics information from header + LogRecordBatchFormat.clearStatisticsFromHeader(modifiedHeader, magic); + Review Comment: `createTrimmedBytesView` updates the LENGTH field and clears the statistics flag/length, but it does not recompute the batch CRC. Since CRC is validated from `schemaIdOffset` to the end of the batch, trimming bytes will make `ensureValid()` fail whenever CRC checking is enabled. Either recompute and rewrite the CRC in the modified header, or clearly document/guarantee this path is only used when CRC checks are skipped. ########## fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java: ########## @@ -287,12 +417,73 @@ private void writeBatchHeader() throws IOException { outputView.writeInt(batchSequence); outputView.writeInt(recordCount); - // Update crc. - long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), schemaIdOffset(magic)); + // For V2, write statistics length + if (magic >= LogRecordBatchFormat.LOG_MAGIC_VALUE_V2) { + outputView.writeInt(statsSize); + } + + // Update crc - CRC covers from schemaId to end of the batch. + // Since statistics bytes are in a separate view (not in pagedOutputView segments), + // we need to compute CRC across all views in bytesView. + long crc = computeCrc(); outputView.setPosition(crcOffset(magic)); outputView.writeUnsignedInt(crc); } + private int statisticsBytesLength() { + return statisticsBytes != null ? statisticsBytes.length : 0; + } + + /** + * Compute CRC over the entire batch content starting from schemaId offset. This handles the + * case where statistics bytes are in a separate view between header and records. + */ + private long computeCrc() { + if (statisticsBytesLength() == 0) { + return Crc32C.compute(pagedOutputView.getWrittenSegments(), schemaIdOffset(magic)); + } + + // With statistics, we need to compute CRC across: + // [header from schemaId] + [statistics bytes] + [records from pagedOutputView after header] + int headerSize = recordBatchHeaderSize(magic); + int schemaIdOff = schemaIdOffset(magic); + + // Collect all bytes for CRC computation + int totalCrcBytes = bytesView.getBytesLength() - schemaIdOff; + byte[] crcBuffer = new byte[totalCrcBytes]; + int pos = 0; Review Comment: `computeCrc()` allocates a `byte[]` of size `bytesView.getBytesLength() - schemaIdOff` and copies the entire batch into it before computing CRC. For large batches this is a significant allocation/GC cost and can cause OOM. Prefer computing the checksum incrementally (e.g., `Checksum crc = Crc32C.create()` + `Checksums.update(...)` over header slice, `statisticsBytes`, then the remaining MemorySegment segments) to avoid materializing the whole batch on heap. ########## fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java: ########## @@ -122,6 +126,122 @@ public void setCurrentProjection( this.selectedFieldPositions = selectedFieldPositions; } + public BytesView projectRecordBatch(FileChannelLogRecordBatch batch) throws IOException { + // Get schema ID from the batch to determine the projection + FileChannel channel = batch.fileRecords.channel(); + int position = batch.position(); + + // Read the log header to get schema ID + logHeaderBuffer.rewind(); + readLogHeaderFullyOrFail(channel, logHeaderBuffer, position); + logHeaderBuffer.rewind(); + byte magic = logHeaderBuffer.get(MAGIC_OFFSET); + logHeaderBuffer.rewind(); Review Comment: `projectRecordBatch` reads a full header into a V2-sized `logHeaderBuffer` via `readLogHeaderFullyOrFail(...)`, but that helper currently only validates short reads for V0/V1. With `magic == V2`, a truncated file/header can slip through and later reads may fail with less-informative errors. Consider extending `readLogHeaderFullyOrFail` to explicitly validate V2 header size when `magic == LOG_MAGIC_VALUE_V2`. ```suggestion // For V2 batches, ensure we have read the full V2 header; otherwise, // a truncated header could slip through and cause less-informative errors later. if (magic == LOG_MAGIC_VALUE_V2) { logHeaderBuffer.rewind(); readFullyOrFail(channel, logHeaderBuffer, position, V2_RECORD_BATCH_HEADER_SIZE); logHeaderBuffer.rewind(); } ``` ########## fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java: ########## @@ -351,6 +357,293 @@ void testResetWriterState() throws Exception { assertThat(recordBatch.batchSequence()).isEqualTo(1); } + @Test + void testStatisticsWriteAndRead() throws Exception { + // Create test data with different data types to test statistics collection + List<Object[]> testData = + Arrays.asList( + new Object[] {1, "a", 10.5}, + new Object[] {2, "b", 20.7}, + new Object[] {3, "c", 15.2}, + new Object[] {4, "d", 30.1}, + new Object[] {5, "e", 8.9}); + + // Create row type for test data + RowType testRowType = + new RowType( + Arrays.asList( + new DataField("id", DataTypes.INT()), + new DataField("name", DataTypes.STRING()), + new DataField("value", DataTypes.DOUBLE()))); + + // Create ArrowWriter and builder + ArrowWriter writer = + provider.getOrCreateWriter( + 1L, DEFAULT_SCHEMA_ID, 1024 * 10, testRowType, NO_COMPRESSION); + MemoryLogRecordsArrowBuilder builder = + createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10, LOG_MAGIC_VALUE_V2); + + // Append test data + List<ChangeType> changeTypes = + testData.stream().map(row -> ChangeType.APPEND_ONLY).collect(Collectors.toList()); + List<InternalRow> rows = + testData.stream().map(DataTestUtils::row).collect(Collectors.toList()); + + for (int i = 0; i < testData.size(); i++) { + builder.append(changeTypes.get(i), rows.get(i)); + } + + // Set writer state and close + builder.setWriterState(1L, 0); + builder.close(); + + // Build and create MemoryLogRecords + MemoryLogRecords records = MemoryLogRecords.pointToBytesView(builder.build()); + + // Verify basic properties + assertThat(records.sizeInBytes()).isGreaterThan(0); + Iterator<LogRecordBatch> iterator = records.batches().iterator(); + assertThat(iterator.hasNext()).isTrue(); + LogRecordBatch batch = iterator.next(); + assertThat(iterator.hasNext()).isFalse(); + + // Verify batch properties + assertThat(batch.getRecordCount()).isEqualTo(testData.size()); + assertThat(batch.baseLogOffset()).isEqualTo(0); + assertThat(batch.lastLogOffset()).isEqualTo(testData.size() - 1); + assertThat(batch.nextLogOffset()).isEqualTo(testData.size()); + assertThat(batch.writerId()).isEqualTo(1L); + assertThat(batch.batchSequence()).isEqualTo(0); + assertThat(batch.magic()).isEqualTo(LogRecordBatchFormat.LOG_MAGIC_VALUE_V2); + + // Create read context + LogRecordReadContext readContext = + LogRecordReadContext.createArrowReadContext( + testRowType, DEFAULT_SCHEMA_ID, TEST_SCHEMA_GETTER); + + // Test statistics reading + Optional<LogRecordBatchStatistics> statisticsOpt = batch.getStatistics(readContext); + assertThat(statisticsOpt).isPresent(); + + LogRecordBatchStatistics statistics = statisticsOpt.get(); + assertThat(statistics.getMinValues()).isNotNull(); + assertThat(statistics.getMaxValues()).isNotNull(); + assertThat(statistics.getNullCounts()).isNotNull(); + + // Verify statistics for each field + // Field 0: id (INT) + assertThat(statistics.getMinValues().getInt(0)).isEqualTo(1); // min id + assertThat(statistics.getMaxValues().getInt(0)).isEqualTo(5); // max id + assertThat(statistics.getNullCounts()[0]).isEqualTo(0); // no nulls + + // Field 1: name (STRING) - string statistics are not collected in current implementation Review Comment: The test comment says string statistics are not collected, but `LogRecordBatchStatisticsCollector` does collect STRING min/max (see collector's STRING case). This comment is misleading; either remove it or add assertions for the STRING field’s min/max to reflect the current behavior. ```suggestion // Field 1: name (STRING) ``` ########## fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java: ########## @@ -1579,6 +1579,18 @@ public class ConfigOptions { + "This mode reduces storage and transmission costs but loses the ability to track previous values. " + "This option only affects primary key tables."); + public static final ConfigOption<String> TABLE_STATISTICS_COLUMNS = + key("table.statistics.columns") + .stringType() + .defaultValue("*") + .withDescription( + "Configures statistics collection for the table. " + + "Empty string ('') (default) means disable statistics collection completely. " + + "The value '*' means collect statistics for all non-binary columns. " Review Comment: `TABLE_STATISTICS_COLUMNS` sets `defaultValue("*")`, but the description says "Empty string ('') (default) means disable". Either the default should be "" or the description should be updated; otherwise users will be misled and stats collection will be enabled by default in practice. ```suggestion + "Empty string ('') means disable statistics collection completely. " + "The value '*' (default) means collect statistics for all non-binary columns. " ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
