wzhero1 commented on code in PR #6858: URL: https://github.com/apache/paimon/pull/6858#discussion_r2652123562
########## paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueSystemFieldsRecordReader.java: ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.utils.ProjectedRow; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A decorator for {@link RecordReader} that injects system fields into the output rows for + * KeyValue-based data sources. + * + * <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code + * RecordReader<InternalRow>} with additional system fields extracted from the KeyValue metadata. + * + * <p><b>Naming:</b> This class is specifically designed for KeyValue format data (e.g., + * MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), system fields are handled + * differently in {@link org.apache.paimon.io.DataFileRecordReader} using file metadata. + * + * <p><b>Field Ordering:</b> The output schema supports arbitrary field ordering. Internally, fields + * are assembled as [system fields... + physical fields...], then reordered using {@link + * ProjectedRow} to match the requested field order. + * + * <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy concatenation of + * system fields and physical fields, then {@link ProjectedRow} for zero-copy field reordering. + */ +public class KeyValueSystemFieldsRecordReader implements RecordReader<InternalRow> { + + private final RecordReader<KeyValue> wrapped; + private final List<SystemFieldExtractor> systemFieldExtractors; + @Nullable private final int[] projection; + + /** + * Creates a KeyValueSystemFieldsRecordReader. + * + * @param wrapped the underlying KeyValue reader + * @param systemFieldExtractors extractors for system fields, in order + * @param projection optional projection to reorder fields. If null, fields are in natural order + * [system fields... + physical fields...]. If provided, projects from the natural order to + * the desired order. + */ + public KeyValueSystemFieldsRecordReader( + RecordReader<KeyValue> wrapped, + List<SystemFieldExtractor> systemFieldExtractors, + @Nullable int[] projection) { + this.wrapped = wrapped; + this.systemFieldExtractors = systemFieldExtractors; + this.projection = projection; + } + + @Nullable + @Override + public RecordIterator<InternalRow> readBatch() throws IOException { + RecordIterator<KeyValue> batch = wrapped.readBatch(); + if (batch == null) { + return null; + } + return new SystemFieldsRecordIterator(batch); + } + + @Override + public void close() throws IOException { + wrapped.close(); + } + + private class SystemFieldsRecordIterator implements RecordIterator<InternalRow> { + + private final RecordIterator<KeyValue> kvIterator; + private final JoinedRow joinedRow; + private final GenericRow systemFieldsRow; + @Nullable private final ProjectedRow projectedRow; + + private SystemFieldsRecordIterator(RecordIterator<KeyValue> kvIterator) { + this.kvIterator = kvIterator; + this.joinedRow = new JoinedRow(); + this.systemFieldsRow = new GenericRow(systemFieldExtractors.size()); + // If projection is provided, use ProjectedRow to reorder fields + this.projectedRow = projection != null ? ProjectedRow.from(projection) : null; + } + + @Nullable + @Override + public InternalRow next() throws IOException { + KeyValue kv = kvIterator.next(); + if (kv == null) { + return null; + } + + InternalRow value = kv.value(); + + // Extract system fields into the reusable row + for (int i = 0; i < systemFieldExtractors.size(); i++) { + SystemFieldExtractor extractor = systemFieldExtractors.get(i); + Object systemValue = extractor.extract(kv); + systemFieldsRow.setField(i, systemValue); + } + + // Join system fields first, then physical fields + // Natural order: [system fields...] + [physical fields...] + joinedRow.replace(systemFieldsRow, value); Review Comment: Great catch! You're right - downstream operator behavior is unpredictable and could mutate the row in-place. I've changed it to `new JoinedRow()`. The minor overhead of object allocation is acceptable to ensure correctness and avoid potential interference between `KeyValueSystemFieldsRecordReader` and downstream operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
