wzhero1 commented on code in PR #6858:
URL: https://github.com/apache/paimon/pull/6858#discussion_r2652123562


##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for
+ * KeyValue-based data sources.
+ *
+ * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
+ * RecordReader<InternalRow>} with additional system fields extracted from the 
KeyValue metadata.
+ *
+ * <p><b>Naming:</b> This class is specifically designed for KeyValue format 
data (e.g.,
+ * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), 
system fields are handled
+ * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file 
metadata.
+ *
+ * <p><b>Field Ordering:</b> The output schema supports arbitrary field 
ordering. Internally, fields
+ * are assembled as [system fields... + physical fields...], then reordered 
using {@link
+ * ProjectedRow} to match the requested field order.
+ *
+ * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy 
concatenation of
+ * system fields and physical fields, then {@link ProjectedRow} for zero-copy 
field reordering.
+ */
+public class KeyValueSystemFieldsRecordReader implements 
RecordReader<InternalRow> {
+
+    private final RecordReader<KeyValue> wrapped;
+    private final List<SystemFieldExtractor> systemFieldExtractors;
+    @Nullable private final int[] projection;
+
+    /**
+     * Creates a KeyValueSystemFieldsRecordReader.
+     *
+     * @param wrapped the underlying KeyValue reader
+     * @param systemFieldExtractors extractors for system fields, in order
+     * @param projection optional projection to reorder fields. If null, 
fields are in natural order
+     *     [system fields... + physical fields...]. If provided, projects from 
the natural order to
+     *     the desired order.
+     */
+    public KeyValueSystemFieldsRecordReader(
+            RecordReader<KeyValue> wrapped,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        this.wrapped = wrapped;
+        this.systemFieldExtractors = systemFieldExtractors;
+        this.projection = projection;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<KeyValue> batch = wrapped.readBatch();
+        if (batch == null) {
+            return null;
+        }
+        return new SystemFieldsRecordIterator(batch);
+    }
+
+    @Override
+    public void close() throws IOException {
+        wrapped.close();
+    }
+
+    private class SystemFieldsRecordIterator implements 
RecordIterator<InternalRow> {
+
+        private final RecordIterator<KeyValue> kvIterator;
+        private final JoinedRow joinedRow;
+        private final GenericRow systemFieldsRow;
+        @Nullable private final ProjectedRow projectedRow;
+
+        private SystemFieldsRecordIterator(RecordIterator<KeyValue> 
kvIterator) {
+            this.kvIterator = kvIterator;
+            this.joinedRow = new JoinedRow();
+            this.systemFieldsRow = new 
GenericRow(systemFieldExtractors.size());
+            // If projection is provided, use ProjectedRow to reorder fields
+            this.projectedRow = projection != null ? 
ProjectedRow.from(projection) : null;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            KeyValue kv = kvIterator.next();
+            if (kv == null) {
+                return null;
+            }
+
+            InternalRow value = kv.value();
+
+            // Extract system fields into the reusable row
+            for (int i = 0; i < systemFieldExtractors.size(); i++) {
+                SystemFieldExtractor extractor = systemFieldExtractors.get(i);
+                Object systemValue = extractor.extract(kv);
+                systemFieldsRow.setField(i, systemValue);
+            }
+
+            // Join system fields first, then physical fields
+            // Natural order: [system fields...] + [physical fields...]
+            joinedRow.replace(systemFieldsRow, value);

Review Comment:
   Great catch! You're right - downstream operator behavior is unpredictable 
and could mutate the row in-place. I've changed it to `new JoinedRow()`. The 
minor overhead of object allocation is acceptable to ensure correctness and 
avoid potential interference between `KeyValueSystemFieldsRecordReader` and 
downstream operators.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to