JingsongLi commented on code in PR #5935:
URL: https://github.com/apache/paimon/pull/5935#discussion_r2235055032


##########
paimon-api/src/main/java/org/apache/paimon/CoreOptions.java:
##########
@@ -1911,6 +1911,11 @@ public InlineElement getDescription() {
                                     + "respectively. When not configured, it 
will automatically determine the algorithm based on the number of columns "
                                     + "in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, "
                                     + "and 'hilbert' for 5 or more columns.");
+    public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =

Review Comment:
   Keep a line above



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java:
##########
@@ -452,7 +475,31 @@ public DataFileMeta copyWithoutStats() {
                 embeddedIndex,
                 fileSource,
                 Collections.emptyList(),
-                externalPath);
+                externalPath,
+                firstRowId);
+    }
+
+    public DataFileMeta copyWithMaxSequenceNumber(long maxSequenceNumber) {

Review Comment:
   `assignSequenceNumber(min, max)`



##########
paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java:
##########
@@ -165,6 +175,16 @@ public FormatReaderMapping build(
 
             // extract the whole data fields in logic.
             List<DataField> allDataFields = fieldsExtractor.apply(dataSchema);
+            if 
(CoreOptions.fromMap(dataSchema.options()).rowTrackingEnabled()) {
+                allDataFields.add(SpecialFields.ROW_ID);
+                allDataFields.add(SpecialFields.SEQUENCE_NUMBER.copy(true));
+            }
+            List<DataField> metaDataFields =
+                    Arrays.asList(SpecialFields.ROW_ID, 
SpecialFields.SEQUENCE_NUMBER);
+            int[] metaMappings = createIndexMapping(allDataFields, 
metaDataFields);
+
+            Map<String, Integer> meta = findMeta(readTableFields);

Review Comment:
   systemFields



##########
paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java:
##########
@@ -67,13 +71,15 @@ public FormatReaderMapping(
             @Nullable Pair<int[], RowType> partitionPair,
             FormatReaderFactory readerFactory,
             TableSchema dataSchema,
-            List<Predicate> dataFilters) {
+            List<Predicate> dataFilters,
+            Map<String, Integer> meta) {

Review Comment:
   systemFields



##########
paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java:
##########
@@ -426,7 +427,10 @@ private static FunctionWithIOException<DataInputView, 
DataFileMeta> getFileMetaS
         } else if (version == 3 || version == 4) {
             DataFileMeta10LegacySerializer serializer = new 
DataFileMeta10LegacySerializer();
             return serializer::deserialize;
-        } else if (version >= 5) {
+        } else if (version == 5 || version == 6) {
+            DataFileMeta12LegacySerializer serializer = new 
DataFileMeta12LegacySerializer();

Review Comment:
   Add test too.



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java:
##########
@@ -21,34 +21,51 @@
 import org.apache.paimon.PartitionSettedRow;
 import org.apache.paimon.casting.CastFieldGetter;
 import org.apache.paimon.casting.CastedRow;
+import org.apache.paimon.casting.FallbackMappingRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ProjectedRow;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
 
 /** Reads {@link InternalRow} from data files. */
 public class DataFileRecordReader implements FileRecordReader<InternalRow> {
 
+    private final RowType tableRowType;
     private final FileRecordReader<InternalRow> reader;
     @Nullable private final int[] indexMapping;
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
+    private final boolean rowLineageEnabled;
+    @Nullable private final Long firstRowId;
+    private final long snapshotId;
+    private final Map<String, Integer> metaColumnIndex;

Review Comment:
   systemFields



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java:
##########
@@ -124,6 +126,8 @@ public class DataFileMeta {
     /** external path of file, if it is null, it is in the default warehouse 
path. */
     private final @Nullable String externalPath;
 
+    private @Nullable Long firstRowId;

Review Comment:
   final



##########
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java:
##########
@@ -140,6 +140,15 @@ public ManifestEntry copyWithoutStats() {
         return new ManifestEntry(kind, partition, bucket, totalBuckets, 
file.copyWithoutStats());
     }
 
+    public ManifestEntry copyWithMaxSequenceNumber(long maxSequenceNumber) {

Review Comment:
   assignSequenceNumber



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java:
##########
@@ -81,6 +107,36 @@ public FileRecordIterator<InternalRow> readBatch() throws 
IOException {
                 final ProjectedRow projectedRow = 
ProjectedRow.from(indexMapping);
                 iterator = iterator.transform(projectedRow::replaceRow);
             }
+
+            if (rowLineageEnabled && !metaColumnIndex.isEmpty()) {
+                GenericRow lineageRow = new GenericRow(metaColumnIndex.size());
+
+                int[] fallbackToMetaRowLineageMappings = new 
int[tableRowType.getFieldCount()];

Review Comment:
   fallbackToLineageMappings



##########
paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java:
##########
@@ -196,7 +216,19 @@ public FormatReaderMapping build(
                             .discover(formatIdentifier)
                             .createReaderFactory(readRowType, readFilters),
                     dataSchema,
-                    readFilters);
+                    readFilters,
+                    meta);
+        }
+
+        private Map<String, Integer> findMeta(List<DataField> readTableFields) 
{

Review Comment:
   `findSystemFields`



##########
paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java:
##########
@@ -59,6 +62,7 @@ public class FormatReaderMapping {
     private final FormatReaderFactory readerFactory;
     private final TableSchema dataSchema;
     private final List<Predicate> dataFilters;
+    private final Map<String, Integer> meta;

Review Comment:
   systemFields



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java:
##########
@@ -275,6 +286,11 @@ public DataFileMeta(
         this.fileSource = fileSource;
         this.valueStatsCols = valueStatsCols;
         this.externalPath = externalPath;
+        this.firstRowId = firstRowId;
+    }
+
+    public void setFirstRowId(@Nullable Long firstRowId) {

Review Comment:
   `assignFirstRowId`



##########
paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java:
##########
@@ -111,6 +117,10 @@ public Pair<int[], RowType> getPartitionPair() {
         return partitionPair;
     }
 
+    public Map<String, Integer> getMeta() {

Review Comment:
   getSystemFields



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java:
##########
@@ -21,34 +21,51 @@
 import org.apache.paimon.PartitionSettedRow;
 import org.apache.paimon.casting.CastFieldGetter;
 import org.apache.paimon.casting.CastedRow;
+import org.apache.paimon.casting.FallbackMappingRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ProjectedRow;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
 
 /** Reads {@link InternalRow} from data files. */
 public class DataFileRecordReader implements FileRecordReader<InternalRow> {
 
+    private final RowType tableRowType;
     private final FileRecordReader<InternalRow> reader;
     @Nullable private final int[] indexMapping;
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
+    private final boolean rowLineageEnabled;
+    @Nullable private final Long firstRowId;
+    private final long snapshotId;
+    private final Map<String, Integer> metaColumnIndex;
 
     public DataFileRecordReader(
+            RowType tableRowType,
             FormatReaderFactory readerFactory,
             FormatReaderFactory.Context context,
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping,
-            @Nullable PartitionInfo partitionInfo)
+            @Nullable PartitionInfo partitionInfo,
+            boolean rowLineageEnabled,
+            @Nullable Long firstRowId,
+            long snapshotId,
+            Map<String, Integer> metaColumnIndex)

Review Comment:
   systemFields



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java:
##########
@@ -21,34 +21,51 @@
 import org.apache.paimon.PartitionSettedRow;
 import org.apache.paimon.casting.CastFieldGetter;
 import org.apache.paimon.casting.CastedRow;
+import org.apache.paimon.casting.FallbackMappingRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ProjectedRow;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
 
 /** Reads {@link InternalRow} from data files. */
 public class DataFileRecordReader implements FileRecordReader<InternalRow> {
 
+    private final RowType tableRowType;
     private final FileRecordReader<InternalRow> reader;
     @Nullable private final int[] indexMapping;
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
+    private final boolean rowLineageEnabled;
+    @Nullable private final Long firstRowId;
+    private final long snapshotId;

Review Comment:
   maxSequenceNumber



##########
paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java:
##########
@@ -76,7 +76,8 @@ public void testCompatibilityToV4CommitV7() throws 
IOException {
                         new byte[] {1, 2, 4},
                         FileSource.COMPACT,
                         Arrays.asList("field1", "field2", "field3"),
-                        "hdfs://localhost:9000/path/to/file");
+                        "hdfs://localhost:9000/path/to/file",
+                        null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
         LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();

Review Comment:
   Add test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to