platinumhamburg commented on code in PR #2485:
URL: https://github.com/apache/fluss/pull/2485#discussion_r2744277090


##########
fluss-common/src/main/java/org/apache/fluss/record/KeyRecordBatch.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.decode.KeyDecoder;
+import org.apache.fluss.row.encode.RowEncoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
+import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
+
+/**
+ * A lazy-decoding KvRecordBatch that stores key bytes and converts them to 
KvRecords on-demand
+ * during iteration.
+ *
+ * <p>When iterating via {@link #records(ReadContext)}, each key is decoded 
and a full row is
+ * constructed with key fields populated and non-key fields set to null. This 
avoids upfront
+ * full-batch decoding, improving memory efficiency.
+ */
+public class KeyRecordBatch implements KvRecordBatch {
+
+    private final List<byte[]> keyBytes;
+
+    /**
+     * Creates a KeyRecordBatch for lazy KvRecord construction.
+     *
+     * @param keyBytes the list of encoded key bytes
+     */
+    public KeyRecordBatch(List<byte[]> keyBytes) {
+        this.keyBytes = keyBytes;
+    }
+
+    @Override
+    public Iterable<KvRecord> records(ReadContext readContext) {
+        return () -> new LazyKvRecordIterator(readContext);
+    }
+
+    @Override
+    public int getRecordCount() {
+        return keyBytes.size();
+    }
+
+    @Override
+    public short schemaId() {
+        return 0;
+    }
+
+    @Override
+    public boolean isValid() {
+        return true;
+    }
+
+    @Override
+    public void ensureValid() {
+        // In-memory batch is always valid
+    }
+
+    @Override
+    public long checksum() {
+        return 0;
+    }
+
+    @Override
+    public byte magic() {
+        return CURRENT_KV_MAGIC_VALUE;
+    }
+
+    @Override
+    public long writerId() {
+        return NO_WRITER_ID;
+    }
+
+    @Override
+    public int batchSequence() {
+        return NO_BATCH_SEQUENCE;
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return keyBytes.stream().mapToInt(k -> k.length).sum();
+    }
+
+    /** Lazy iterator that constructs KvRecord on-demand during iteration. */
+    private class LazyKvRecordIterator implements Iterator<KvRecord> {
+        private int index = 0;
+
+        private final int fieldCount;
+        private final int[] keyFieldIndexes;
+        private final InternalRow.FieldGetter[] keyFieldGetters;
+        private final RowEncoder rowEncoder;
+        private final KeyDecoder keyDecoder;
+
+        LazyKvRecordIterator(ReadContext context) {
+            SchemaGetter schemaGetter = context.getSchemaGetter();
+            KvFormat kvFormat = context.getKvFormat();
+            Schema schema = schemaGetter.getLatestSchemaInfo().getSchema();
+            RowType rowType = schema.getRowType();
+            List<String> keyColumnNames = schema.getPrimaryKeyColumnNames();
+
+            this.fieldCount = rowType.getFieldCount();
+            this.keyFieldIndexes = schema.getPrimaryKeyIndexes();
+            this.keyDecoder = KeyDecoder.of(rowType, keyColumnNames);
+            this.keyFieldGetters = new 
InternalRow.FieldGetter[keyColumnNames.size()];
+
+            // Create field getters for key fields (reading from decoded key 
row at position i)
+            for (int i = 0; i < keyColumnNames.size(); i++) {
+                DataType keyFieldType = rowType.getTypeAt(keyFieldIndexes[i]);
+                keyFieldGetters[i] = 
InternalRow.createFieldGetter(keyFieldType, i);
+            }
+
+            this.rowEncoder = RowEncoder.create(kvFormat, rowType);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return index < keyBytes.size();
+        }
+
+        @Override
+        public KvRecord next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            byte[] key = keyBytes.get(index++);
+
+            InternalRow keyRow = keyDecoder.decodeKey(key);
+
+            // Build full row: key fields from decoded values, non-key fields 
as null
+            rowEncoder.startNewRow();
+            int keyIdx = 0;
+            for (int i = 0; i < fieldCount; i++) {
+                if (keyIdx < keyFieldIndexes.length && keyFieldIndexes[keyIdx] 
== i) {
+                    // This is a key field - copy from decoded key row
+                    Object value = 
keyFieldGetters[keyIdx].getFieldOrNull(keyRow);
+                    rowEncoder.encodeField(i, value);
+                    keyIdx++;
+                } else {
+                    // Non-key field - set to null
+                    rowEncoder.encodeField(i, null);
+                }
+            }

Review Comment:
   Need to add tests for KeyRecordBatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to