platinumhamburg commented on code in PR #2485: URL: https://github.com/apache/fluss/pull/2485#discussion_r2744277090
########## fluss-common/src/main/java/org/apache/fluss/record/KeyRecordBatch.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.decode.KeyDecoder; +import org.apache.fluss.row.encode.RowEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; + +/** + * A lazy-decoding KvRecordBatch that stores key bytes and converts them to KvRecords on-demand + * during iteration. + * + * <p>When iterating via {@link #records(ReadContext)}, each key is decoded and a full row is + * constructed with key fields populated and non-key fields set to null. This avoids upfront + * full-batch decoding, improving memory efficiency. + */ +public class KeyRecordBatch implements KvRecordBatch { + + private final List<byte[]> keyBytes; + + /** + * Creates a KeyRecordBatch for lazy KvRecord construction. + * + * @param keyBytes the list of encoded key bytes + */ + public KeyRecordBatch(List<byte[]> keyBytes) { + this.keyBytes = keyBytes; + } + + @Override + public Iterable<KvRecord> records(ReadContext readContext) { + return () -> new LazyKvRecordIterator(readContext); + } + + @Override + public int getRecordCount() { + return keyBytes.size(); + } + + @Override + public short schemaId() { + return 0; + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public void ensureValid() { + // In-memory batch is always valid + } + + @Override + public long checksum() { + return 0; + } + + @Override + public byte magic() { + return CURRENT_KV_MAGIC_VALUE; + } + + @Override + public long writerId() { + return NO_WRITER_ID; + } + + @Override + public int batchSequence() { + return NO_BATCH_SEQUENCE; + } + + @Override + public int sizeInBytes() { + return keyBytes.stream().mapToInt(k -> k.length).sum(); + } + + /** Lazy iterator that constructs KvRecord on-demand during iteration. */ + private class LazyKvRecordIterator implements Iterator<KvRecord> { + private int index = 0; + + private final int fieldCount; + private final int[] keyFieldIndexes; + private final InternalRow.FieldGetter[] keyFieldGetters; + private final RowEncoder rowEncoder; + private final KeyDecoder keyDecoder; + + LazyKvRecordIterator(ReadContext context) { + SchemaGetter schemaGetter = context.getSchemaGetter(); + KvFormat kvFormat = context.getKvFormat(); + Schema schema = schemaGetter.getLatestSchemaInfo().getSchema(); + RowType rowType = schema.getRowType(); + List<String> keyColumnNames = schema.getPrimaryKeyColumnNames(); + + this.fieldCount = rowType.getFieldCount(); + this.keyFieldIndexes = schema.getPrimaryKeyIndexes(); + this.keyDecoder = KeyDecoder.of(rowType, keyColumnNames); + this.keyFieldGetters = new InternalRow.FieldGetter[keyColumnNames.size()]; + + // Create field getters for key fields (reading from decoded key row at position i) + for (int i = 0; i < keyColumnNames.size(); i++) { + DataType keyFieldType = rowType.getTypeAt(keyFieldIndexes[i]); + keyFieldGetters[i] = InternalRow.createFieldGetter(keyFieldType, i); + } + + this.rowEncoder = RowEncoder.create(kvFormat, rowType); + } + + @Override + public boolean hasNext() { + return index < keyBytes.size(); + } + + @Override + public KvRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + byte[] key = keyBytes.get(index++); + + InternalRow keyRow = keyDecoder.decodeKey(key); + + // Build full row: key fields from decoded values, non-key fields as null + rowEncoder.startNewRow(); + int keyIdx = 0; + for (int i = 0; i < fieldCount; i++) { + if (keyIdx < keyFieldIndexes.length && keyFieldIndexes[keyIdx] == i) { + // This is a key field - copy from decoded key row + Object value = keyFieldGetters[keyIdx].getFieldOrNull(keyRow); + rowEncoder.encodeField(i, value); + keyIdx++; + } else { + // Non-key field - set to null + rowEncoder.encodeField(i, null); + } + } Review Comment: Need to add tests for KeyRecordBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
