JingsongLi commented on code in PR #5935:
URL: https://github.com/apache/paimon/pull/5935#discussion_r2234397194


##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java:
##########
@@ -100,8 +101,9 @@ public class DataFileMeta {
     private final SimpleStats keyStats;
     private final SimpleStats valueStats;
 
-    private final long minSequenceNumber;
-    private final long maxSequenceNumber;
+    // As for row-lineage table, this will be reassigned while committing
+    private long minSequenceNumber;

Review Comment:
   You should use copy instead making it variable.



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java:
##########
@@ -124,6 +126,8 @@ public class DataFileMeta {
     /** external path of file, if it is null, it is in the default warehouse 
path. */
     private final @Nullable String externalPath;
 
+    private @Nullable Long rowIdStart;

Review Comment:
   _FIRST_ROW_ID or rowIdStart?



##########
paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java:
##########
@@ -74,6 +78,13 @@ public CommitMessage doCompact(FileStoreTable table, 
BaseAppendFileStoreWrite wr
         Preconditions.checkArgument(
                 dvEnabled || compactBefore.size() > 1,
                 "AppendOnlyCompactionTask need more than one file input.");
+        if (table.coreOptions().rowTrackingEnabled()) {

Review Comment:
   Remove this? Let's finish compact in next PR.



##########
paimon-common/src/main/java/org/apache/paimon/utils/SequenceNumberCounter.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+
+import java.io.Serializable;
+
+/** To count sequence number. */
+public interface SequenceNumberCounter extends Serializable {

Review Comment:
   `SequenceCounter`



##########
paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java:
##########
@@ -81,6 +102,29 @@ public FileRecordIterator<InternalRow> readBatch() throws 
IOException {
                 final ProjectedRow projectedRow = 
ProjectedRow.from(indexMapping);
                 iterator = iterator.transform(projectedRow::replaceRow);
             }
+
+            if (rowLineageEnabled && !metaColumnIndex.isEmpty()) {
+                GenericRow genericRow = new GenericRow(metaColumnIndex.size());

Review Comment:
   lineageRow



##########
paimon-core/src/main/java/org/apache/paimon/table/system/RowLineageTable.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.SimpleFileReader;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for reading row id of table. */
+public class RowLineageTable implements DataTable, ReadonlyTable {
+
+    public static final String WITH_METADATA = "row_lineage";

Review Comment:
   `ROW_LINEAGE`



##########
paimon-common/src/main/java/org/apache/paimon/utils/LongCounter.java:
##########
@@ -18,42 +18,36 @@
 
 package org.apache.paimon.utils;
 
-import java.io.Serializable;
+import org.apache.paimon.data.InternalRow;
 
 /** An counter that sums up {@code long} values. */
-public class LongCounter implements Serializable {
+public class LongCounter implements SequenceNumberCounter {

Review Comment:
   You don't need to change this class. This class just have 50 lines, we don't 
need to reuse codes for it.



##########
paimon-core/src/main/java/org/apache/paimon/rowlineage/PartialMappingRow.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rowlineage;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+/** Row with row lineage inject in. */
+public class PartialMappingRow implements InternalRow {

Review Comment:
   RowWithLineage



##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java:
##########
@@ -1137,6 +1152,28 @@ CommitResult tryCommitOnce(
         return new SuccessResult();
     }
 
+    private long assignRowMeta(long startRowId, List<ManifestEntry> 
deltaFiles) {
+        if (deltaFiles.isEmpty()) {
+            return startRowId;
+        }
+        // assign row id for new files
+        long start = startRowId;
+        for (ManifestEntry entry : deltaFiles) {
+            if 
(entry.file().fileSource().orElse(FileSource.COMPACT).equals(FileSource.APPEND))
 {

Review Comment:
   assert must have fileSource.



##########
paimon-common/src/main/java/org/apache/paimon/utils/MinSequenceCounter.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+
+/** Sequence number counter only generate min sequence. */
+public class MinSequenceCounter implements SequenceNumberCounter {

Review Comment:
   Remove it, it just for compact.



##########
paimon-common/src/main/java/org/apache/paimon/utils/SequenceNumberCounter.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+
+import java.io.Serializable;
+
+/** To count sequence number. */
+public interface SequenceNumberCounter extends Serializable {

Review Comment:
   Why it should be `Serializable`?



##########
paimon-core/src/main/java/org/apache/paimon/rowlineage/PartialMappingRow.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rowlineage;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+/** Row with row lineage inject in. */
+public class PartialMappingRow implements InternalRow {
+
+    private InternalRow main;
+    private InternalRow rowLineage;
+    private final int[] mappings;
+
+    public PartialMappingRow(int[] mappings) {
+        this.mappings = mappings;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return main.getFieldCount();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return main.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        main.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        for (int i = 0; i < mappings.length; i++) {

Review Comment:
   Remove this loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to