yunfengzhou-hub commented on code in PR #6858:
URL: https://github.com/apache/paimon/pull/6858#discussion_r2647934475


##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for
+ * KeyValue-based data sources.
+ *
+ * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
+ * RecordReader<InternalRow>} with additional system fields extracted from the 
KeyValue metadata.
+ *
+ * <p><b>Naming:</b> This class is specifically designed for KeyValue format 
data (e.g.,
+ * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), 
system fields are handled
+ * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file 
metadata.
+ *
+ * <p><b>Field Ordering:</b> The output schema supports arbitrary field 
ordering. Internally, fields
+ * are assembled as [system fields... + physical fields...], then reordered 
using {@link
+ * ProjectedRow} to match the requested field order.
+ *
+ * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy 
concatenation of
+ * system fields and physical fields, then {@link ProjectedRow} for zero-copy 
field reordering.
+ */
+public class KeyValueSystemFieldsRecordReader implements 
RecordReader<InternalRow> {
+
+    private final RecordReader<KeyValue> wrapped;
+    private final List<SystemFieldExtractor> systemFieldExtractors;
+    @Nullable private final int[] projection;
+
+    /**
+     * Creates a KeyValueSystemFieldsRecordReader.
+     *
+     * @param wrapped the underlying KeyValue reader
+     * @param systemFieldExtractors extractors for system fields, in order
+     * @param projection optional projection to reorder fields. If null, 
fields are in natural order
+     *     [system fields... + physical fields...]. If provided, projects from 
the natural order to
+     *     the desired order.
+     */
+    public KeyValueSystemFieldsRecordReader(
+            RecordReader<KeyValue> wrapped,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        this.wrapped = wrapped;
+        this.systemFieldExtractors = systemFieldExtractors;
+        this.projection = projection;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<KeyValue> batch = wrapped.readBatch();
+        if (batch == null) {
+            return null;
+        }
+        return new SystemFieldsRecordIterator(batch);
+    }
+
+    @Override
+    public void close() throws IOException {
+        wrapped.close();
+    }
+
+    private class SystemFieldsRecordIterator implements 
RecordIterator<InternalRow> {
+
+        private final RecordIterator<KeyValue> kvIterator;
+        private final JoinedRow joinedRow;
+        private final GenericRow systemFieldsRow;
+        @Nullable private final ProjectedRow projectedRow;
+
+        private SystemFieldsRecordIterator(RecordIterator<KeyValue> 
kvIterator) {
+            this.kvIterator = kvIterator;
+            this.joinedRow = new JoinedRow();
+            this.systemFieldsRow = new 
GenericRow(systemFieldExtractors.size());
+            // If projection is provided, use ProjectedRow to reorder fields
+            this.projectedRow = projection != null ? 
ProjectedRow.from(projection) : null;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            KeyValue kv = kvIterator.next();
+            if (kv == null) {
+                return null;
+            }
+
+            InternalRow value = kv.value();
+
+            // Extract system fields into the reusable row
+            for (int i = 0; i < systemFieldExtractors.size(); i++) {
+                SystemFieldExtractor extractor = systemFieldExtractors.get(i);
+                Object systemValue = extractor.extract(kv);
+                systemFieldsRow.setField(i, systemValue);
+            }
+
+            // Join system fields first, then physical fields
+            // Natural order: [system fields...] + [physical fields...]
+            joinedRow.replace(systemFieldsRow, value);

Review Comment:
   I'm a little concerned about the correctness risk due to object reuse. We 
can see that in this project, JoinedRow#replace is mainly used in two ways
   
   * `new JoinedRow().replace`: no reuse for this class.
   * `joinedRow.replace` soon followed by a serializer or writer, which means 
there is no more modification to this  row after reused.
   
   But here, the joined row will be output by the source to downstream 
operators, whose operations might modify the row in place and have 
`KeyValueSystemFieldsRecordReader` and that operator affects each other.
   
   You can refer to online blogs and stackoverflow questions about the 
correctness risk of Flink object reuse mechanism, and check whether it is 
acceptable to enable object reuse here.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for
+ * KeyValue-based data sources.
+ *
+ * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
+ * RecordReader<InternalRow>} with additional system fields extracted from the 
KeyValue metadata.
+ *
+ * <p><b>Naming:</b> This class is specifically designed for KeyValue format 
data (e.g.,
+ * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), 
system fields are handled
+ * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file 
metadata.
+ *
+ * <p><b>Field Ordering:</b> The output schema supports arbitrary field 
ordering. Internally, fields
+ * are assembled as [system fields... + physical fields...], then reordered 
using {@link
+ * ProjectedRow} to match the requested field order.
+ *
+ * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy 
concatenation of
+ * system fields and physical fields, then {@link ProjectedRow} for zero-copy 
field reordering.
+ */
+public class KeyValueSystemFieldsRecordReader implements 
RecordReader<InternalRow> {
+
+    private final RecordReader<KeyValue> wrapped;
+    private final List<SystemFieldExtractor> systemFieldExtractors;
+    @Nullable private final int[] projection;
+
+    /**
+     * Creates a KeyValueSystemFieldsRecordReader.
+     *
+     * @param wrapped the underlying KeyValue reader
+     * @param systemFieldExtractors extractors for system fields, in order
+     * @param projection optional projection to reorder fields. If null, 
fields are in natural order
+     *     [system fields... + physical fields...]. If provided, projects from 
the natural order to
+     *     the desired order.
+     */
+    public KeyValueSystemFieldsRecordReader(
+            RecordReader<KeyValue> wrapped,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        this.wrapped = wrapped;
+        this.systemFieldExtractors = systemFieldExtractors;
+        this.projection = projection;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<KeyValue> batch = wrapped.readBatch();
+        if (batch == null) {
+            return null;
+        }
+        return new SystemFieldsRecordIterator(batch);
+    }
+
+    @Override
+    public void close() throws IOException {
+        wrapped.close();
+    }
+
+    private class SystemFieldsRecordIterator implements 
RecordIterator<InternalRow> {
+
+        private final RecordIterator<KeyValue> kvIterator;
+        private final JoinedRow joinedRow;
+        private final GenericRow systemFieldsRow;
+        @Nullable private final ProjectedRow projectedRow;
+
+        private SystemFieldsRecordIterator(RecordIterator<KeyValue> 
kvIterator) {
+            this.kvIterator = kvIterator;
+            this.joinedRow = new JoinedRow();
+            this.systemFieldsRow = new 
GenericRow(systemFieldExtractors.size());
+            // If projection is provided, use ProjectedRow to reorder fields
+            this.projectedRow = projection != null ? 
ProjectedRow.from(projection) : null;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            KeyValue kv = kvIterator.next();
+            if (kv == null) {
+                return null;
+            }
+
+            InternalRow value = kv.value();
+
+            // Extract system fields into the reusable row
+            for (int i = 0; i < systemFieldExtractors.size(); i++) {
+                SystemFieldExtractor extractor = systemFieldExtractors.get(i);
+                Object systemValue = extractor.extract(kv);
+                systemFieldsRow.setField(i, systemValue);
+            }
+
+            // Join system fields first, then physical fields
+            // Natural order: [system fields...] + [physical fields...]
+            joinedRow.replace(systemFieldsRow, value);
+            joinedRow.setRowKind(kv.valueKind());
+
+            // If projection is provided, reorder to match requested order
+            if (projectedRow != null) {
+                return projectedRow.replaceRow(joinedRow);
+            }
+            return joinedRow;
+        }
+
+        @Override
+        public void releaseBatch() {
+            kvIterator.releaseBatch();
+        }
+    }
+
+    /**
+     * Wraps a KeyValue reader with system field injection if needed.
+     *
+     * @param reader the KeyValue reader
+     * @param systemFieldExtractors extractors for system fields (empty if no 
system fields needed)
+     * @param projection optional projection to reorder fields from natural 
order [system fields...
+     *     + physical fields...] to desired order
+     * @return a reader producing InternalRow with system fields, or a simple 
unwrapped reader if no
+     *     system fields
+     */
+    public static RecordReader<InternalRow> wrap(
+            RecordReader<KeyValue> reader,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        if (systemFieldExtractors.isEmpty()) {
+            // No system fields, use the default unwrap logic
+            return KeyValueTableRead.unwrap(reader);

Review Comment:
   It might be better to merge this method into `KeyValueTableRead#unwrap`. As 
a decorator/wrapper to this method, extending the original method might be more 
beneficial to extensibility.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for
+ * KeyValue-based data sources.
+ *
+ * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
+ * RecordReader<InternalRow>} with additional system fields extracted from the 
KeyValue metadata.
+ *
+ * <p><b>Naming:</b> This class is specifically designed for KeyValue format 
data (e.g.,
+ * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), 
system fields are handled
+ * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file 
metadata.
+ *
+ * <p><b>Field Ordering:</b> The output schema supports arbitrary field 
ordering. Internally, fields
+ * are assembled as [system fields... + physical fields...], then reordered 
using {@link
+ * ProjectedRow} to match the requested field order.
+ *
+ * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy 
concatenation of
+ * system fields and physical fields, then {@link ProjectedRow} for zero-copy 
field reordering.
+ */
+public class KeyValueSystemFieldsRecordReader implements 
RecordReader<InternalRow> {
+
+    private final RecordReader<KeyValue> wrapped;
+    private final List<SystemFieldExtractor> systemFieldExtractors;
+    @Nullable private final int[] projection;
+
+    /**
+     * Creates a KeyValueSystemFieldsRecordReader.
+     *
+     * @param wrapped the underlying KeyValue reader
+     * @param systemFieldExtractors extractors for system fields, in order
+     * @param projection optional projection to reorder fields. If null, 
fields are in natural order
+     *     [system fields... + physical fields...]. If provided, projects from 
the natural order to
+     *     the desired order.
+     */
+    public KeyValueSystemFieldsRecordReader(
+            RecordReader<KeyValue> wrapped,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        this.wrapped = wrapped;
+        this.systemFieldExtractors = systemFieldExtractors;
+        this.projection = projection;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<KeyValue> batch = wrapped.readBatch();
+        if (batch == null) {
+            return null;
+        }
+        return new SystemFieldsRecordIterator(batch);
+    }
+
+    @Override
+    public void close() throws IOException {
+        wrapped.close();
+    }
+
+    private class SystemFieldsRecordIterator implements 
RecordIterator<InternalRow> {
+
+        private final RecordIterator<KeyValue> kvIterator;
+        private final JoinedRow joinedRow;
+        private final GenericRow systemFieldsRow;
+        @Nullable private final ProjectedRow projectedRow;
+
+        private SystemFieldsRecordIterator(RecordIterator<KeyValue> 
kvIterator) {
+            this.kvIterator = kvIterator;
+            this.joinedRow = new JoinedRow();
+            this.systemFieldsRow = new 
GenericRow(systemFieldExtractors.size());
+            // If projection is provided, use ProjectedRow to reorder fields
+            this.projectedRow = projection != null ? 
ProjectedRow.from(projection) : null;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            KeyValue kv = kvIterator.next();
+            if (kv == null) {
+                return null;
+            }
+
+            InternalRow value = kv.value();
+
+            // Extract system fields into the reusable row
+            for (int i = 0; i < systemFieldExtractors.size(); i++) {
+                SystemFieldExtractor extractor = systemFieldExtractors.get(i);
+                Object systemValue = extractor.extract(kv);
+                systemFieldsRow.setField(i, systemValue);
+            }
+
+            // Join system fields first, then physical fields
+            // Natural order: [system fields...] + [physical fields...]
+            joinedRow.replace(systemFieldsRow, value);
+            joinedRow.setRowKind(kv.valueKind());
+
+            // If projection is provided, reorder to match requested order
+            if (projectedRow != null) {
+                return projectedRow.replaceRow(joinedRow);
+            }
+            return joinedRow;
+        }
+
+        @Override
+        public void releaseBatch() {
+            kvIterator.releaseBatch();
+        }
+    }
+
+    /**
+     * Wraps a KeyValue reader with system field injection if needed.
+     *
+     * @param reader the KeyValue reader
+     * @param systemFieldExtractors extractors for system fields (empty if no 
system fields needed)
+     * @param projection optional projection to reorder fields from natural 
order [system fields...
+     *     + physical fields...] to desired order
+     * @return a reader producing InternalRow with system fields, or a simple 
unwrapped reader if no
+     *     system fields
+     */
+    public static RecordReader<InternalRow> wrap(
+            RecordReader<KeyValue> reader,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        if (systemFieldExtractors.isEmpty()) {
+            // No system fields, use the default unwrap logic
+            return KeyValueTableRead.unwrap(reader);
+        }
+        return new KeyValueSystemFieldsRecordReader(reader, 
systemFieldExtractors, projection);
+    }
+
+    // ========== Internal Extractor Interface ==========
+
+    /**
+     * Internal interface for extracting system fields from {@link KeyValue} 
objects.
+     *
+     * <p>System fields are metadata fields like {@code _SEQUENCE_NUMBER}, 
{@code _LEVEL}, {@code
+     * rowkind} that are derived from the KeyValue container itself rather 
than the stored data.
+     *
+     * <p><b>Note:</b> This interface is specifically for KeyValue-based 
extraction. For InternalRow
+     * readers (e.g., RawFileSplitRead), system fields are handled differently 
in {@link
+     * org.apache.paimon.io.DataFileRecordReader} using file metadata.
+     *
+     * <p>All field definitions are sourced from {@link SpecialFields} to 
maintain consistency
+     * across the codebase.
+     *
+     * <p>Each extractor is stateless and thread-safe.
+     */
+    @FunctionalInterface
+    public interface SystemFieldExtractor {
+
+        /**
+         * Extracts the system field value from a KeyValue object.
+         *
+         * @param kv the KeyValue to extract from
+         * @return the extracted value, or null if not applicable
+         */
+        @Nullable
+        Object extract(KeyValue kv);
+
+        // ========== Built-in Extractors ==========
+
+        /**
+         * Extractor for {@code _SEQUENCE_NUMBER} system field.
+         *
+         * <p>Extracts the sequence number from KeyValue metadata.
+         */
+        SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber();

Review Comment:
   ```suggestion
           SystemFieldExtractor SEQUENCE_NUMBER = KeyValue::sequenceNumber;
   ```



##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for
+ * KeyValue-based data sources.
+ *
+ * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
+ * RecordReader<InternalRow>} with additional system fields extracted from the 
KeyValue metadata.
+ *
+ * <p><b>Naming:</b> This class is specifically designed for KeyValue format 
data (e.g.,
+ * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), 
system fields are handled
+ * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file 
metadata.
+ *
+ * <p><b>Field Ordering:</b> The output schema supports arbitrary field 
ordering. Internally, fields
+ * are assembled as [system fields... + physical fields...], then reordered 
using {@link
+ * ProjectedRow} to match the requested field order.
+ *
+ * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy 
concatenation of
+ * system fields and physical fields, then {@link ProjectedRow} for zero-copy 
field reordering.
+ */
+public class KeyValueSystemFieldsRecordReader implements 
RecordReader<InternalRow> {
+
+    private final RecordReader<KeyValue> wrapped;
+    private final List<SystemFieldExtractor> systemFieldExtractors;
+    @Nullable private final int[] projection;
+
+    /**
+     * Creates a KeyValueSystemFieldsRecordReader.
+     *
+     * @param wrapped the underlying KeyValue reader
+     * @param systemFieldExtractors extractors for system fields, in order
+     * @param projection optional projection to reorder fields. If null, 
fields are in natural order
+     *     [system fields... + physical fields...]. If provided, projects from 
the natural order to
+     *     the desired order.
+     */
+    public KeyValueSystemFieldsRecordReader(
+            RecordReader<KeyValue> wrapped,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        this.wrapped = wrapped;
+        this.systemFieldExtractors = systemFieldExtractors;
+        this.projection = projection;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<KeyValue> batch = wrapped.readBatch();
+        if (batch == null) {
+            return null;
+        }
+        return new SystemFieldsRecordIterator(batch);
+    }
+
+    @Override
+    public void close() throws IOException {
+        wrapped.close();
+    }
+
+    private class SystemFieldsRecordIterator implements 
RecordIterator<InternalRow> {
+
+        private final RecordIterator<KeyValue> kvIterator;
+        private final JoinedRow joinedRow;
+        private final GenericRow systemFieldsRow;
+        @Nullable private final ProjectedRow projectedRow;
+
+        private SystemFieldsRecordIterator(RecordIterator<KeyValue> 
kvIterator) {
+            this.kvIterator = kvIterator;
+            this.joinedRow = new JoinedRow();
+            this.systemFieldsRow = new 
GenericRow(systemFieldExtractors.size());
+            // If projection is provided, use ProjectedRow to reorder fields
+            this.projectedRow = projection != null ? 
ProjectedRow.from(projection) : null;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            KeyValue kv = kvIterator.next();
+            if (kv == null) {
+                return null;
+            }
+
+            InternalRow value = kv.value();
+
+            // Extract system fields into the reusable row
+            for (int i = 0; i < systemFieldExtractors.size(); i++) {
+                SystemFieldExtractor extractor = systemFieldExtractors.get(i);
+                Object systemValue = extractor.extract(kv);
+                systemFieldsRow.setField(i, systemValue);
+            }
+
+            // Join system fields first, then physical fields
+            // Natural order: [system fields...] + [physical fields...]
+            joinedRow.replace(systemFieldsRow, value);
+            joinedRow.setRowKind(kv.valueKind());
+
+            // If projection is provided, reorder to match requested order
+            if (projectedRow != null) {
+                return projectedRow.replaceRow(joinedRow);
+            }
+            return joinedRow;
+        }
+
+        @Override
+        public void releaseBatch() {
+            kvIterator.releaseBatch();
+        }
+    }
+
+    /**
+     * Wraps a KeyValue reader with system field injection if needed.
+     *
+     * @param reader the KeyValue reader
+     * @param systemFieldExtractors extractors for system fields (empty if no 
system fields needed)
+     * @param projection optional projection to reorder fields from natural 
order [system fields...
+     *     + physical fields...] to desired order
+     * @return a reader producing InternalRow with system fields, or a simple 
unwrapped reader if no
+     *     system fields
+     */
+    public static RecordReader<InternalRow> wrap(
+            RecordReader<KeyValue> reader,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        if (systemFieldExtractors.isEmpty()) {
+            // No system fields, use the default unwrap logic
+            return KeyValueTableRead.unwrap(reader);
+        }
+        return new KeyValueSystemFieldsRecordReader(reader, 
systemFieldExtractors, projection);
+    }
+
+    // ========== Internal Extractor Interface ==========
+
+    /**
+     * Internal interface for extracting system fields from {@link KeyValue} 
objects.
+     *
+     * <p>System fields are metadata fields like {@code _SEQUENCE_NUMBER}, 
{@code _LEVEL}, {@code
+     * rowkind} that are derived from the KeyValue container itself rather 
than the stored data.
+     *
+     * <p><b>Note:</b> This interface is specifically for KeyValue-based 
extraction. For InternalRow
+     * readers (e.g., RawFileSplitRead), system fields are handled differently 
in {@link
+     * org.apache.paimon.io.DataFileRecordReader} using file metadata.
+     *
+     * <p>All field definitions are sourced from {@link SpecialFields} to 
maintain consistency
+     * across the codebase.
+     *
+     * <p>Each extractor is stateless and thread-safe.
+     */
+    @FunctionalInterface
+    public interface SystemFieldExtractor {
+
+        /**
+         * Extracts the system field value from a KeyValue object.
+         *
+         * @param kv the KeyValue to extract from
+         * @return the extracted value, or null if not applicable
+         */
+        @Nullable
+        Object extract(KeyValue kv);
+
+        // ========== Built-in Extractors ==========
+
+        /**
+         * Extractor for {@code _SEQUENCE_NUMBER} system field.
+         *
+         * <p>Extracts the sequence number from KeyValue metadata.
+         */
+        SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber();
+
+        /**
+         * Extractor for {@code rowkind} system field (used in AuditLogTable).
+         *
+         * <p>Extracts the row kind from KeyValue's valueKind.
+         */
+        SystemFieldExtractor ROW_KIND = kv -> 
BinaryString.fromString(kv.valueKind().shortString());
+
+        /**
+         * Extractor for {@code _LEVEL} system field (LSM tree level).
+         *
+         * <p>Note: Currently not extractable from KeyValue at read time. This 
is a placeholder for
+         * future implementation where level information would need to be 
tracked through the read
+         * path.
+         */
+        SystemFieldExtractor LEVEL = kv -> null; // TODO: Level information 
needs to be propagated

Review Comment:
   If this system field cannot be extracted for now, I would prefer to direct 
throw exceptions in such cases, instead of giving users a false assumption that 
it could work, but only gets null values.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java:
##########
@@ -76,6 +80,17 @@ public SplitRead<InternalRow> withIOManager(@Nullable 
IOManager ioManager) {
     @Override
     public SplitRead<InternalRow> withReadType(RowType readType) {
         this.readType = readType;
+
+        List<DataField> fieldsWithAllSchema = new ArrayList<>();
+        for (DataField field : readType.getFields()) {
+            if (SpecialFields.isSystemField(field.name())) {
+                fieldsWithAllSchema.add(field);
+            }
+        }
+        fieldsWithAllSchema.addAll(mergeRead.tableSchema().fields());

Review Comment:
   The name `fieldsWithReorderedSchema` might need to be improved for better 
readability.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java:
##########
@@ -75,7 +74,8 @@ private SplitRead<InternalRow> 
create(Supplier<MergeFileSplitRead> supplier) {
                                                     dataSplit.dataFiles(),
                                                     
dataSplit.deletionFiles().orElse(null),
                                                     false));
-                    return unwrap(reader);
+                    return KeyValueSystemFieldsRecordReader.wrap(
+                            reader, read.getSystemFieldExtractors(), 
read.getProjection());

Review Comment:
   LookupCompactDiffRead still uses unwrap. Does it also need to change to this 
method?



##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for
+ * KeyValue-based data sources.
+ *
+ * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
+ * RecordReader<InternalRow>} with additional system fields extracted from the 
KeyValue metadata.
+ *
+ * <p><b>Naming:</b> This class is specifically designed for KeyValue format 
data (e.g.,
+ * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), 
system fields are handled
+ * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file 
metadata.
+ *
+ * <p><b>Field Ordering:</b> The output schema supports arbitrary field 
ordering. Internally, fields
+ * are assembled as [system fields... + physical fields...], then reordered 
using {@link
+ * ProjectedRow} to match the requested field order.
+ *
+ * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy 
concatenation of
+ * system fields and physical fields, then {@link ProjectedRow} for zero-copy 
field reordering.
+ */
+public class KeyValueSystemFieldsRecordReader implements 
RecordReader<InternalRow> {
+
+    private final RecordReader<KeyValue> wrapped;
+    private final List<SystemFieldExtractor> systemFieldExtractors;
+    @Nullable private final int[] projection;
+
+    /**
+     * Creates a KeyValueSystemFieldsRecordReader.
+     *
+     * @param wrapped the underlying KeyValue reader
+     * @param systemFieldExtractors extractors for system fields, in order
+     * @param projection optional projection to reorder fields. If null, 
fields are in natural order
+     *     [system fields... + physical fields...]. If provided, projects from 
the natural order to
+     *     the desired order.
+     */
+    public KeyValueSystemFieldsRecordReader(
+            RecordReader<KeyValue> wrapped,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        this.wrapped = wrapped;
+        this.systemFieldExtractors = systemFieldExtractors;
+        this.projection = projection;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<KeyValue> batch = wrapped.readBatch();
+        if (batch == null) {
+            return null;
+        }
+        return new SystemFieldsRecordIterator(batch);
+    }
+
+    @Override
+    public void close() throws IOException {
+        wrapped.close();
+    }
+
+    private class SystemFieldsRecordIterator implements 
RecordIterator<InternalRow> {
+
+        private final RecordIterator<KeyValue> kvIterator;
+        private final JoinedRow joinedRow;
+        private final GenericRow systemFieldsRow;
+        @Nullable private final ProjectedRow projectedRow;
+
+        private SystemFieldsRecordIterator(RecordIterator<KeyValue> 
kvIterator) {
+            this.kvIterator = kvIterator;
+            this.joinedRow = new JoinedRow();
+            this.systemFieldsRow = new 
GenericRow(systemFieldExtractors.size());
+            // If projection is provided, use ProjectedRow to reorder fields
+            this.projectedRow = projection != null ? 
ProjectedRow.from(projection) : null;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            KeyValue kv = kvIterator.next();
+            if (kv == null) {
+                return null;
+            }
+
+            InternalRow value = kv.value();
+
+            // Extract system fields into the reusable row
+            for (int i = 0; i < systemFieldExtractors.size(); i++) {
+                SystemFieldExtractor extractor = systemFieldExtractors.get(i);
+                Object systemValue = extractor.extract(kv);
+                systemFieldsRow.setField(i, systemValue);
+            }
+
+            // Join system fields first, then physical fields
+            // Natural order: [system fields...] + [physical fields...]
+            joinedRow.replace(systemFieldsRow, value);
+            joinedRow.setRowKind(kv.valueKind());
+
+            // If projection is provided, reorder to match requested order
+            if (projectedRow != null) {
+                return projectedRow.replaceRow(joinedRow);
+            }
+            return joinedRow;
+        }
+
+        @Override
+        public void releaseBatch() {
+            kvIterator.releaseBatch();
+        }
+    }
+
+    /**
+     * Wraps a KeyValue reader with system field injection if needed.
+     *
+     * @param reader the KeyValue reader
+     * @param systemFieldExtractors extractors for system fields (empty if no 
system fields needed)
+     * @param projection optional projection to reorder fields from natural 
order [system fields...
+     *     + physical fields...] to desired order
+     * @return a reader producing InternalRow with system fields, or a simple 
unwrapped reader if no
+     *     system fields
+     */
+    public static RecordReader<InternalRow> wrap(
+            RecordReader<KeyValue> reader,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        if (systemFieldExtractors.isEmpty()) {
+            // No system fields, use the default unwrap logic
+            return KeyValueTableRead.unwrap(reader);
+        }
+        return new KeyValueSystemFieldsRecordReader(reader, 
systemFieldExtractors, projection);
+    }
+
+    // ========== Internal Extractor Interface ==========
+
+    /**
+     * Internal interface for extracting system fields from {@link KeyValue} 
objects.
+     *
+     * <p>System fields are metadata fields like {@code _SEQUENCE_NUMBER}, 
{@code _LEVEL}, {@code
+     * rowkind} that are derived from the KeyValue container itself rather 
than the stored data.
+     *
+     * <p><b>Note:</b> This interface is specifically for KeyValue-based 
extraction. For InternalRow
+     * readers (e.g., RawFileSplitRead), system fields are handled differently 
in {@link
+     * org.apache.paimon.io.DataFileRecordReader} using file metadata.
+     *
+     * <p>All field definitions are sourced from {@link SpecialFields} to 
maintain consistency
+     * across the codebase.
+     *
+     * <p>Each extractor is stateless and thread-safe.
+     */
+    @FunctionalInterface
+    public interface SystemFieldExtractor {
+
+        /**
+         * Extracts the system field value from a KeyValue object.
+         *
+         * @param kv the KeyValue to extract from
+         * @return the extracted value, or null if not applicable
+         */
+        @Nullable
+        Object extract(KeyValue kv);
+
+        // ========== Built-in Extractors ==========
+
+        /**
+         * Extractor for {@code _SEQUENCE_NUMBER} system field.
+         *
+         * <p>Extracts the sequence number from KeyValue metadata.
+         */
+        SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber();
+
+        /**
+         * Extractor for {@code rowkind} system field (used in AuditLogTable).
+         *
+         * <p>Extracts the row kind from KeyValue's valueKind.
+         */
+        SystemFieldExtractor ROW_KIND = kv -> 
BinaryString.fromString(kv.valueKind().shortString());
+
+        /**
+         * Extractor for {@code _LEVEL} system field (LSM tree level).
+         *
+         * <p>Note: Currently not extractable from KeyValue at read time. This 
is a placeholder for
+         * future implementation where level information would need to be 
tracked through the read
+         * path.
+         */
+        SystemFieldExtractor LEVEL = kv -> null; // TODO: Level information 
needs to be propagated
+
+        /**
+         * Extractor for {@code _ROW_ID} system field.
+         *
+         * <p>Note: ROW_ID is typically handled by DataFileRecordReader for 
InternalRow-based
+         * readers. This extractor is provided for completeness but may not be 
used in KeyValue
+         * scenarios.
+         */
+        SystemFieldExtractor ROW_ID =
+                kv -> null; // ROW_ID is computed from file metadata, not 
available in KeyValue

Review Comment:
   Same as above



##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala:
##########
@@ -87,7 +88,8 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
     }
 
     if (ROW_TRACKING_META_COLUMNS.contains(fieldName)) {
-      if (!coreOptions.rowTrackingEnabled()) {
+      val isSystemTable = table.isInstanceOf[AuditLogTable] || 
table.isInstanceOf[BinlogTable]

Review Comment:
   System tables are more than audit log table and binlog table.



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -722,19 +699,14 @@ public void setRowKind(RowKind kind) {
 
         @Override
         public boolean isNullAt(int pos) {
-            if (indexMapping[pos] < 0) {
-                // row kind is always not null
-                return false;
-            }
-            return super.isNullAt(pos);
+            return pos == rowkindPos ? false : super.isNullAt(pos);
         }
 
         @Override
         public BinaryString getString(int pos) {
-            if (indexMapping[pos] < 0) {
-                return BinaryString.fromString(row.getRowKind().shortString());
-            }
-            return super.getString(pos);
+            return pos == rowkindPos
+                    ? BinaryString.fromString(getRowKind().shortString())

Review Comment:
   If rowkind can be covered by SystemFieldExtractor now, do we still need this?



##########
paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java:
##########
@@ -97,6 +99,11 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
     @Nullable private int[][] outerProjection;
     @Nullable private VariantAccessInfo[] variantAccess;
 
+    private List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> 
systemFieldExtractors =
+            Collections.emptyList();
+
+    @Nullable private int[] projection = null;

Review Comment:
   There are now at least three variables associated with "projection" in this 
class now
   * pushdownProjection
   * outerProjection
   * projection
   
   It might be better to merge or clarify between these projections, so as to 
improve readability.



##########
paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java:
##########
@@ -408,4 +428,68 @@ public UserDefinedSeqComparator createUdsComparator() {
         return UserDefinedSeqComparator.create(
                 readerFactoryBuilder.readValueType(), sequenceFields, 
sequenceOrder);
     }
+
+    /**
+     * Collects system field extractors for the requested read type.
+     *
+     * @param readType the requested read type (may contain system fields)
+     * @return list of extractors for system fields present in readType
+     */
+    private List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor>
+            collectSystemFieldExtractors(RowType readType) {
+        if (readType == null) {

Review Comment:
   It might be impossible for this condition to be met, otherwise other codes 
should have already thrown NPEs.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for

Review Comment:
   A decorator pattern should have the same wrapped type and output type, but 
here the wrapped is `RecordReader<KeyValue>`, and the output is 
`RecordReader<InternalRow>`. It might be more proper to call this class a 
wrapper rather than a decorator.



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -696,17 +666,24 @@ public RecordReader<InternalRow> createReader(Split 
split) throws IOException {
             return dataRead.createReader(split).transform(this::convertRow);
         }
 
-        private InternalRow convertRow(InternalRow data) {
-            return new AuditLogRow(readProjection, data);
+        private InternalRow convertRow(InternalRow row) {
+            row.setRowKind(org.apache.paimon.types.RowKind.INSERT);
+            // Fallback for append-only tables: rowkind field is null, wrap to 
return "+I"

Review Comment:
   `row.setRowKind(org.apache.paimon.types.RowKind.INSERT)` is placed above the 
comment and if condition, which seems that it will always return +I, regardless 
of whether it is append-only table.



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala:
##########
@@ -289,7 +289,7 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
       // after files:  f(-D 999),   f(+I 10001,10002)
       checkAnswer(
         sql("SELECT * FROM paimon_incremental_query('`t$audit_log`', 'tag1', 
'tag2') ORDER BY id"),
-        Seq(Row("-D", 999)))
+        Seq(Row("-D", 100002L, 999)))

Review Comment:
   100002L is the value of `id` column, rather than sequence number. Not sure 
whether it is correct here.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for
+ * KeyValue-based data sources.
+ *
+ * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
+ * RecordReader<InternalRow>} with additional system fields extracted from the 
KeyValue metadata.
+ *
+ * <p><b>Naming:</b> This class is specifically designed for KeyValue format 
data (e.g.,
+ * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), 
system fields are handled
+ * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file 
metadata.
+ *
+ * <p><b>Field Ordering:</b> The output schema supports arbitrary field 
ordering. Internally, fields
+ * are assembled as [system fields... + physical fields...], then reordered 
using {@link
+ * ProjectedRow} to match the requested field order.
+ *
+ * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy 
concatenation of
+ * system fields and physical fields, then {@link ProjectedRow} for zero-copy 
field reordering.
+ */
+public class KeyValueSystemFieldsRecordReader implements 
RecordReader<InternalRow> {

Review Comment:
   According to the existing `SpecialFields` class, Might be better to name 
them as `KeyValueSpecialFieldsRecordReader` and `SpecialFieldExtractor`.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.utils.ProjectedRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A decorator for {@link RecordReader} that injects system fields into the 
output rows for
+ * KeyValue-based data sources.
+ *
+ * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
+ * RecordReader<InternalRow>} with additional system fields extracted from the 
KeyValue metadata.
+ *
+ * <p><b>Naming:</b> This class is specifically designed for KeyValue format 
data (e.g.,
+ * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), 
system fields are handled
+ * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file 
metadata.
+ *
+ * <p><b>Field Ordering:</b> The output schema supports arbitrary field 
ordering. Internally, fields
+ * are assembled as [system fields... + physical fields...], then reordered 
using {@link
+ * ProjectedRow} to match the requested field order.
+ *
+ * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy 
concatenation of
+ * system fields and physical fields, then {@link ProjectedRow} for zero-copy 
field reordering.
+ */
+public class KeyValueSystemFieldsRecordReader implements 
RecordReader<InternalRow> {
+
+    private final RecordReader<KeyValue> wrapped;
+    private final List<SystemFieldExtractor> systemFieldExtractors;
+    @Nullable private final int[] projection;
+
+    /**
+     * Creates a KeyValueSystemFieldsRecordReader.
+     *
+     * @param wrapped the underlying KeyValue reader
+     * @param systemFieldExtractors extractors for system fields, in order
+     * @param projection optional projection to reorder fields. If null, 
fields are in natural order
+     *     [system fields... + physical fields...]. If provided, projects from 
the natural order to
+     *     the desired order.
+     */
+    public KeyValueSystemFieldsRecordReader(
+            RecordReader<KeyValue> wrapped,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        this.wrapped = wrapped;
+        this.systemFieldExtractors = systemFieldExtractors;
+        this.projection = projection;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<InternalRow> readBatch() throws IOException {
+        RecordIterator<KeyValue> batch = wrapped.readBatch();
+        if (batch == null) {
+            return null;
+        }
+        return new SystemFieldsRecordIterator(batch);
+    }
+
+    @Override
+    public void close() throws IOException {
+        wrapped.close();
+    }
+
+    private class SystemFieldsRecordIterator implements 
RecordIterator<InternalRow> {
+
+        private final RecordIterator<KeyValue> kvIterator;
+        private final JoinedRow joinedRow;
+        private final GenericRow systemFieldsRow;
+        @Nullable private final ProjectedRow projectedRow;
+
+        private SystemFieldsRecordIterator(RecordIterator<KeyValue> 
kvIterator) {
+            this.kvIterator = kvIterator;
+            this.joinedRow = new JoinedRow();
+            this.systemFieldsRow = new 
GenericRow(systemFieldExtractors.size());
+            // If projection is provided, use ProjectedRow to reorder fields
+            this.projectedRow = projection != null ? 
ProjectedRow.from(projection) : null;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            KeyValue kv = kvIterator.next();
+            if (kv == null) {
+                return null;
+            }
+
+            InternalRow value = kv.value();
+
+            // Extract system fields into the reusable row
+            for (int i = 0; i < systemFieldExtractors.size(); i++) {
+                SystemFieldExtractor extractor = systemFieldExtractors.get(i);
+                Object systemValue = extractor.extract(kv);
+                systemFieldsRow.setField(i, systemValue);
+            }
+
+            // Join system fields first, then physical fields
+            // Natural order: [system fields...] + [physical fields...]
+            joinedRow.replace(systemFieldsRow, value);
+            joinedRow.setRowKind(kv.valueKind());
+
+            // If projection is provided, reorder to match requested order
+            if (projectedRow != null) {
+                return projectedRow.replaceRow(joinedRow);
+            }
+            return joinedRow;
+        }
+
+        @Override
+        public void releaseBatch() {
+            kvIterator.releaseBatch();
+        }
+    }
+
+    /**
+     * Wraps a KeyValue reader with system field injection if needed.
+     *
+     * @param reader the KeyValue reader
+     * @param systemFieldExtractors extractors for system fields (empty if no 
system fields needed)
+     * @param projection optional projection to reorder fields from natural 
order [system fields...
+     *     + physical fields...] to desired order
+     * @return a reader producing InternalRow with system fields, or a simple 
unwrapped reader if no
+     *     system fields
+     */
+    public static RecordReader<InternalRow> wrap(
+            RecordReader<KeyValue> reader,
+            List<SystemFieldExtractor> systemFieldExtractors,
+            @Nullable int[] projection) {
+        if (systemFieldExtractors.isEmpty()) {
+            // No system fields, use the default unwrap logic
+            return KeyValueTableRead.unwrap(reader);
+        }
+        return new KeyValueSystemFieldsRecordReader(reader, 
systemFieldExtractors, projection);
+    }
+
+    // ========== Internal Extractor Interface ==========
+
+    /**
+     * Internal interface for extracting system fields from {@link KeyValue} 
objects.
+     *
+     * <p>System fields are metadata fields like {@code _SEQUENCE_NUMBER}, 
{@code _LEVEL}, {@code
+     * rowkind} that are derived from the KeyValue container itself rather 
than the stored data.
+     *
+     * <p><b>Note:</b> This interface is specifically for KeyValue-based 
extraction. For InternalRow
+     * readers (e.g., RawFileSplitRead), system fields are handled differently 
in {@link
+     * org.apache.paimon.io.DataFileRecordReader} using file metadata.
+     *
+     * <p>All field definitions are sourced from {@link SpecialFields} to 
maintain consistency
+     * across the codebase.
+     *
+     * <p>Each extractor is stateless and thread-safe.
+     */
+    @FunctionalInterface
+    public interface SystemFieldExtractor {
+
+        /**
+         * Extracts the system field value from a KeyValue object.
+         *
+         * @param kv the KeyValue to extract from
+         * @return the extracted value, or null if not applicable
+         */
+        @Nullable
+        Object extract(KeyValue kv);
+
+        // ========== Built-in Extractors ==========
+
+        /**
+         * Extractor for {@code _SEQUENCE_NUMBER} system field.
+         *
+         * <p>Extracts the sequence number from KeyValue metadata.
+         */
+        SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber();
+
+        /**
+         * Extractor for {@code rowkind} system field (used in AuditLogTable).
+         *
+         * <p>Extracts the row kind from KeyValue's valueKind.
+         */
+        SystemFieldExtractor ROW_KIND = kv -> 
BinaryString.fromString(kv.valueKind().shortString());
+
+        /**
+         * Extractor for {@code _LEVEL} system field (LSM tree level).
+         *
+         * <p>Note: Currently not extractable from KeyValue at read time. This 
is a placeholder for
+         * future implementation where level information would need to be 
tracked through the read
+         * path.
+         */
+        SystemFieldExtractor LEVEL = kv -> null; // TODO: Level information 
needs to be propagated
+
+        /**
+         * Extractor for {@code _ROW_ID} system field.
+         *
+         * <p>Note: ROW_ID is typically handled by DataFileRecordReader for 
InternalRow-based
+         * readers. This extractor is provided for completeness but may not be 
used in KeyValue
+         * scenarios.
+         */
+        SystemFieldExtractor ROW_ID =
+                kv -> null; // ROW_ID is computed from file metadata, not 
available in KeyValue
+    }
+
+    // ========== Registry ==========
+
+    /** Registry for system field extractors. */
+    private static final Map<String, SystemFieldExtractor> EXTRACTOR_REGISTRY 
= new HashMap<>();
+
+    static {
+        // Register all extractors that can be used with KeyValue
+        EXTRACTOR_REGISTRY.put(
+                SpecialFields.SEQUENCE_NUMBER.name(), 
SystemFieldExtractor.SEQUENCE_NUMBER);
+        EXTRACTOR_REGISTRY.put(SpecialFields.ROW_KIND.name(), 
SystemFieldExtractor.ROW_KIND);
+        EXTRACTOR_REGISTRY.put(SpecialFields.LEVEL.name(), 
SystemFieldExtractor.LEVEL);
+        EXTRACTOR_REGISTRY.put(SpecialFields.ROW_ID.name(), 
SystemFieldExtractor.ROW_ID);
+    }
+
+    /**
+     * Gets an extractor by field name.
+     *
+     * @param fieldName the system field name
+     * @return the extractor, or null if not a registered system field
+     */
+    @Nullable
+    public static SystemFieldExtractor getExtractor(String fieldName) {
+        return EXTRACTOR_REGISTRY.get(fieldName);

Review Comment:
   This method only checks field names. It might be better to also check field 
data types.



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -142,6 +144,9 @@ public String name() {
     public RowType rowType() {
         List<DataField> fields = new ArrayList<>();
         fields.add(SpecialFields.ROW_KIND);
+        fields.add(
+                SpecialFields.SEQUENCE_NUMBER.newType(
+                        SpecialFields.SEQUENCE_NUMBER.type().nullable()));

Review Comment:
   Do you think it would be better if we only add sequence number to 
primary-key tables, not append-only tables, and make this field notnull()?



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -722,19 +699,14 @@ public void setRowKind(RowKind kind) {
 
         @Override
         public boolean isNullAt(int pos) {
-            if (indexMapping[pos] < 0) {
-                // row kind is always not null
-                return false;
-            }
-            return super.isNullAt(pos);
+            return pos == rowkindPos ? false : super.isNullAt(pos);

Review Comment:
   Better keep the previous code structure and comment.



##########
paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java:
##########
@@ -48,11 +56,176 @@
 /** Unit tests for {@link AuditLogTable}. */
 public class AuditLogTableTest extends TableTestBase {
 
-    private static final String tableName = "MyTable";
+    private FileStoreTable baseTable;
     private AuditLogTable auditLogTable;
 
     @BeforeEach
     public void before() throws Exception {
+        baseTable = createTableWithData("AuditLogTestTable");
+        Identifier auditLogId =
+                identifier(baseTable.name() + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
+        auditLogTable = (AuditLogTable) catalog.getTable(auditLogId);
+    }
+
+    @Test
+    public void testBatchReadWithSystemFields() throws Exception {
+        List<InternalRow> result = read(auditLogTable);
+
+        // Verify we get 4 records (after compaction: 1 DELETE, 1 
UPDATE_BEFORE, 1 UPDATE_AFTER, 1
+        // INSERT)
+        assertThat(result).hasSize(4);
+
+        // Verify all rows have correct system fields (ROW_KIND at index 0, 
SEQUENCE_NUMBER at
+        // index 1)
+        for (InternalRow row : result) {
+            // ROW_KIND should be a BinaryString
+            assertThat(row.getString(0)).isNotNull();
+            // SEQUENCE_NUMBER should be a Long
+            assertThat(row.getLong(1)).isGreaterThanOrEqualTo(0L);
+        }
+
+        // Verify specific records
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        // DELETE: pk=1, pt=1, col1=1, seq=1
+                        GenericRow.of(
+                                
BinaryString.fromString(RowKind.DELETE.shortString()), 1L, 1, 1, 1),
+                        // UPDATE_BEFORE: pk=1, pt=2, col1=5, seq=1
+                        GenericRow.of(
+                                
BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()),
+                                1L,
+                                1,
+                                2,
+                                5),
+                        // UPDATE_AFTER: pk=1, pt=4, col1=6, seq=0 (new data)
+                        GenericRow.of(
+                                
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+                                0L,
+                                1,
+                                4,
+                                6),
+                        // INSERT: pk=2, pt=3, col1=1, seq=0
+                        GenericRow.of(
+                                
BinaryString.fromString(RowKind.INSERT.shortString()),
+                                0L,
+                                2,
+                                3,
+                                1));
+    }
+
+    @Test
+    public void testReadWithPartialFieldsAndOutOfOrder() throws Exception {
+        // Test reading with partial fields in a different order than table 
schema
+        // AuditLogTable schema: [rowkind, _SEQUENCE_NUMBER, pk, pt, col1]
+        // We want to read: [pt, rowkind, col1] (out of order, partial fields)
+
+        List<InternalRow> result = readWithCustomProjection(auditLogTable);
+
+        // Verify we get 4 records
+        assertThat(result).hasSize(4);
+
+        // Verify field ordering: [pt, rowkind, col1]
+        for (InternalRow row : result) {
+            // Index 0: pt (INT)
+            assertThat(row.getInt(0)).isGreaterThan(0);
+            // Index 1: rowkind (STRING)
+            assertThat(row.getString(1)).isNotNull();
+            // Index 2: col1 (INT)
+            assertThat(row.getInt(2)).isGreaterThan(0);
+        }
+
+        // Verify specific records with the new field order
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        // pt=1, rowkind=DELETE, col1=1
+                        GenericRow.of(1, 
BinaryString.fromString(RowKind.DELETE.shortString()), 1),
+                        // pt=2, rowkind=UPDATE_BEFORE, col1=5
+                        GenericRow.of(
+                                2, 
BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 5),
+                        // pt=4, rowkind=UPDATE_AFTER, col1=6
+                        GenericRow.of(
+                                4, 
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 6),
+                        // pt=3, rowkind=INSERT, col1=1
+                        GenericRow.of(3, 
BinaryString.fromString(RowKind.INSERT.shortString()), 1));
+    }
+
+    /**
+     * Helper method to read with custom field projection in a specific order. 
Reads fields [pt,
+     * rowkind, col1] from AuditLogTable.
+     */
+    private List<InternalRow> readWithCustomProjection(AuditLogTable table) 
throws Exception {
+        RowType tableRowType = auditLogTable.rowType();
+        List<DataField> customFields =
+                Arrays.asList(
+                        tableRowType.getField("pt"),
+                        tableRowType.getField("rowkind"),
+                        tableRowType.getField("col1"));
+        RowType customReadType = new RowType(customFields);
+
+        ReadBuilder readBuilder = 
table.newReadBuilder().withReadType(customReadType);
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(customReadType);
+
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+        return rows;
+    }
+
+    @Test
+    public void testAppendOnlyTableWithRowKind() throws Exception {
+        FileStoreTable appendTable = 
createAppendOnlyTable("AppendOnlyAuditTest");
+
+        TableWriteImpl<?> write = appendTable.newWrite("user0");
+        StreamTableCommit commit = appendTable.newCommit("user0");
+
+        write.write(GenericRow.of(1, BinaryString.fromString("Alice"), 100L));
+        write.write(GenericRow.of(2, BinaryString.fromString("Bob"), 200L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.close();
+        commit.close();
+
+        AuditLogTable auditLog = new AuditLogTable(appendTable);
+        List<InternalRow> result = read(auditLog);
+
+        // Verify specific records: [rowkind, seq, id, name, amount]
+        // Note: append-only table has null sequence number
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(
+                                BinaryString.fromString("+I"),
+                                null,
+                                1,
+                                BinaryString.fromString("Alice"),
+                                100L),
+                        GenericRow.of(
+                                BinaryString.fromString("+I"),
+                                null,
+                                2,
+                                BinaryString.fromString("Bob"),
+                                200L));
+    }
+
+    // ==================== Helper Methods ====================
+
+    /** Creates a FileStoreTable with changelog producer and writes test data. 
*/
+    private FileStoreTable createTableWithData(String tableName) throws 
Exception {
+        FileStoreTable table = createChangelogTable(tableName);
+
+        // Write test data with different RowKinds
+        write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1)); // Will be 
deleted
+        write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
+        write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5)); // Will be 
updated
+        write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
+        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+        write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1)); // Remains 
as insert
+
+        return table;
+    }
+
+    /** Creates a table with changelog producer enabled. */
+    private FileStoreTable createChangelogTable(String tableName) throws 
Exception {

Review Comment:
   This method is only used in one place. So compared with introducing this 
method, it might be better to preserve the original code structure and commit 
history.



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -722,19 +699,14 @@ public void setRowKind(RowKind kind) {
 
         @Override
         public boolean isNullAt(int pos) {
-            if (indexMapping[pos] < 0) {
-                // row kind is always not null
-                return false;
-            }
-            return super.isNullAt(pos);
+            return pos == rowkindPos ? false : super.isNullAt(pos);

Review Comment:
   Only rowkindPos? not sequenceNumberPos?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to