yunfengzhou-hub commented on code in PR #6933:
URL: https://github.com/apache/paimon/pull/6933#discussion_r2667560386


##########
docs/layouts/shortcodes/generated/core_configuration.html:
##########
@@ -146,6 +146,12 @@
             <td>String</td>
             <td>Fields that are ignored for comparison while generating -U, +U 
changelog for the same record. This configuration is only valid for the 
changelog-producer.row-deduplicate is true.</td>
         </tr>
+        <tr>
+            <td><h5>changelog-read.sequence-number.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to include _SEQUENCE_NUMBER field in audit_log and 
binlog system tables. This is only valid for primary key tables.</td>

Review Comment:
   The output result of an audit log table can be controlled by 
`incremental-between-scan-mode`, where `changelog` is only one of the options. 
So we might better not include the word "changelog" in the config option name.



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -87,25 +87,37 @@
 public class AuditLogTable implements DataTable, ReadonlyTable {
 
     public static final String AUDIT_LOG = "audit_log";
+    public static final String AUDIT_LOG_ENABLED = "audit.log.enabled";
 
-    public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
-            p -> {
-                if (p.index() == 0) {
-                    return Optional.empty();
-                }
-                return Optional.of(
-                        new LeafPredicate(
-                                p.function(),
-                                p.type(),
-                                p.index() - 1,
-                                p.fieldName(),
-                                p.literals()));
-            };
+    protected final FileStoreTable wrapped;
 
-    private final FileStoreTable wrapped;
+    /** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */
+    protected final int specialFieldCount;

Review Comment:
   How about introducing `List<SpecialField> specialFields`? If so, we will be 
able to change
   ```java
   fields.add(SpecialFields.ROW_KIND);
   if (specialFieldCount > 1) {
       fields.add(SpecialFields.SEQUENCE_NUMBER);
   }
   ```
   into 
   ```java
   fields.add(specialFields);
   ```



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -87,25 +87,37 @@
 public class AuditLogTable implements DataTable, ReadonlyTable {
 
     public static final String AUDIT_LOG = "audit_log";
+    public static final String AUDIT_LOG_ENABLED = "audit.log.enabled";
 
-    public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
-            p -> {
-                if (p.index() == 0) {
-                    return Optional.empty();
-                }
-                return Optional.of(
-                        new LeafPredicate(
-                                p.function(),
-                                p.type(),
-                                p.index() - 1,
-                                p.fieldName(),
-                                p.literals()));
-            };
+    protected final FileStoreTable wrapped;
 
-    private final FileStoreTable wrapped;
+    /** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */
+    protected final int specialFieldCount;
 
     public AuditLogTable(FileStoreTable wrapped) {
         this.wrapped = wrapped;
+        this.wrapped.schema().options().put(AUDIT_LOG_ENABLED, "true");

Review Comment:
   How about organize the two new configurations in the following way?
   1. User sets config A.
   2. In `AuditLogTable`, the code detects whether config A is set. If so, set 
config B to true.
   3. In `ValueContentRowDataRecordIterator`, the code only checks config B, 
instead of configA && configB.
   
   The benefit of such structure is that it can decouple system table concepts 
from `ValueContentRowDataRecordIterator`. config B need not be related to audit 
log table.



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -723,15 +756,26 @@ public void setRowKind(RowKind kind) {
         @Override
         public boolean isNullAt(int pos) {
             if (indexMapping[pos] < 0) {
-                // row kind is always not null
+                // row kind and sequence num are always not null
                 return false;
             }
             return super.isNullAt(pos);
         }
 
+        @Override
+        public long getLong(int pos) {
+            int index = indexMapping[pos];
+            if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) {
+                // _SEQUENCE_NUMBER is at index 0 in bottom output

Review Comment:
   The reason behind this comment is located in another 
class(`ValueContentRowDataRecordIterator`), and these two classes are mainly 
associated with `AUDIT_LOG_ENABLED`, which shows a generic concept not related 
to sequence number.
   
   Thus it might increase the other developer's burden to understand why 
"_SEQUENCE_NUMBER is at index 0". We might need to think about how to increase 
code readability here.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java:
##########
@@ -1053,6 +1053,108 @@ public void testBinlogTableWithProjection() {
                 .containsExactly(Row.of("+I", new String[] {"A"}));
     }
 
+    @Test
+    public void testAuditLogTableWithSequenceNumberEnabled() {
+        // Create primary key table with 
changelog-read.sequence-number.enabled option
+        sql(
+                "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, 
b int, c AS a + b) "
+                        + "WITH 
('changelog-read.sequence-number.enabled'='true');");

Review Comment:
   Despite that users are allowed to set 
`changelog-read.sequence-number.enabled` when creating or altering table, a 
more recommended way is to configure it dynamically in the SELECT query through 
Flink [SQL 
Hints](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/sql/queries/hints/).
 It should be more suitable for such configurations that are only useful in 
specific read queries. The usage of SQL hints is like follows
   ```sql
   SELECT * FROM `test_table_seq$audit_log`/*+ 
OPTIONS('changelog-read.sequence-number.enabled' = 'true') */;
   ```
   Let's add test cases for this use case as well.



##########
paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java:
##########
@@ -48,60 +48,117 @@
 /** Unit tests for {@link AuditLogTable}. */
 public class AuditLogTableTest extends TableTestBase {
 
-    private static final String tableName = "MyTable";
-    private AuditLogTable auditLogTable;
+    @Test
+    public void testReadAuditLogFromLatest() throws Exception {
+        AuditLogTable auditLogTable = createAuditLogTable("audit_table", 
false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+        List<InternalRow> expectRow = getExpectedResult();
+        List<InternalRow> result = read(auditLogTable);
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithTableOption() throws Exception {
+        AuditLogTable auditLogTable = 
createAuditLogTable("audit_table_with_seq", true);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(auditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithAlterTable() throws Exception {
+        String tableName = "audit_table_alter_seq";
+        // Create table without sequence-number option
+        AuditLogTable auditLogTable = createAuditLogTable(tableName, false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+
+        // Add sequence-number option via alterTable
+        catalog.alterTable(
+                identifier(tableName),
+                SchemaChange.setOption(
+                        
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
+                false);
 
-    @BeforeEach
-    public void before() throws Exception {
+        // Re-fetch the audit_log table to get updated schema
+        Identifier auditLogTableId =
+                identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
+        AuditLogTable updatedAuditLogTable = (AuditLogTable) 
catalog.getTable(auditLogTableId);
+
+        // Verify schema now includes _SEQUENCE_NUMBER
+        assertThat(updatedAuditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(updatedAuditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    private AuditLogTable createAuditLogTable(String tableName, boolean 
enableSequenceNumber)
+            throws Exception {
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
         FileIO fileIO = LocalFileIO.create();
 
-        Schema schema =
+        Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .column("pk", DataTypes.INT())
                         .column("pt", DataTypes.INT())
                         .column("col1", DataTypes.INT())
                         .partitionKeys("pt")
                         .primaryKey("pk", "pt")
                         .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
-                        .option("bucket", "1")
-                        .build();
+                        .option("bucket", "1");
+        if (enableSequenceNumber) {
+            
schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"true");
+        }
 
         TableSchema tableSchema =
-                SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath), 
schemaBuilder.build());
         FileStoreTable table =
                 FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
-        Identifier filesTableId =
+
+        writeTestData(table);
+
+        Identifier auditLogTableId =
                 identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
-        auditLogTable = (AuditLogTable) catalog.getTable(filesTableId);
+        return (AuditLogTable) catalog.getTable(auditLogTableId);
+    }
 
+    private void writeTestData(FileStoreTable table) throws Exception {
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
         write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
-        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
         write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
     }
 
-    @Test
-    public void testReadAuditLogFromLatest() throws Exception {
-        List<InternalRow> expectRow = getExpectedResult();
-        List<InternalRow> result = read(auditLogTable);
-        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
-    }
-
     private List<InternalRow> getExpectedResult() {
         List<InternalRow> expectedRow = new ArrayList<>();
         expectedRow.add(
                 
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1));
         expectedRow.add(
                 GenericRow.of(
-                        
BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5));
+                        
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6));

Review Comment:
   In the original code, the expected result contains 4 rows. Here only three 
rows are left, missing an `UPDATE_BEFORE` row.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java:
##########
@@ -1053,6 +1053,108 @@ public void testBinlogTableWithProjection() {
                 .containsExactly(Row.of("+I", new String[] {"A"}));
     }
 
+    @Test
+    public void testAuditLogTableWithSequenceNumberEnabled() {
+        // Create primary key table with 
changelog-read.sequence-number.enabled option
+        sql(
+                "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, 
b int, c AS a + b) "
+                        + "WITH 
('changelog-read.sequence-number.enabled'='true');");

Review Comment:
   Let's also add some test cases to verify what will happen if 
`changelog-read.sequence-number.enabled` is configured on append-only tables.



##########
paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java:
##########
@@ -723,15 +756,26 @@ public void setRowKind(RowKind kind) {
         @Override
         public boolean isNullAt(int pos) {
             if (indexMapping[pos] < 0) {
-                // row kind is always not null
+                // row kind and sequence num are always not null
                 return false;
             }
             return super.isNullAt(pos);
         }
 
+        @Override
+        public long getLong(int pos) {
+            int index = indexMapping[pos];
+            if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) {
+                // _SEQUENCE_NUMBER is at index 0 in bottom output
+                return row.getLong(0);
+            }
+            return super.getLong(pos);

Review Comment:
   If _SEQUENCE_NUMBER is at index 0, then the other fields should be acquired 
through `pos - 1`.



##########
paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java:
##########
@@ -48,60 +48,117 @@
 /** Unit tests for {@link AuditLogTable}. */
 public class AuditLogTableTest extends TableTestBase {
 
-    private static final String tableName = "MyTable";
-    private AuditLogTable auditLogTable;
+    @Test
+    public void testReadAuditLogFromLatest() throws Exception {
+        AuditLogTable auditLogTable = createAuditLogTable("audit_table", 
false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+        List<InternalRow> expectRow = getExpectedResult();
+        List<InternalRow> result = read(auditLogTable);
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithTableOption() throws Exception {
+        AuditLogTable auditLogTable = 
createAuditLogTable("audit_table_with_seq", true);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(auditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithAlterTable() throws Exception {
+        String tableName = "audit_table_alter_seq";
+        // Create table without sequence-number option
+        AuditLogTable auditLogTable = createAuditLogTable(tableName, false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+
+        // Add sequence-number option via alterTable
+        catalog.alterTable(
+                identifier(tableName),
+                SchemaChange.setOption(
+                        
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
+                false);
 
-    @BeforeEach
-    public void before() throws Exception {
+        // Re-fetch the audit_log table to get updated schema
+        Identifier auditLogTableId =
+                identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
+        AuditLogTable updatedAuditLogTable = (AuditLogTable) 
catalog.getTable(auditLogTableId);
+
+        // Verify schema now includes _SEQUENCE_NUMBER
+        assertThat(updatedAuditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(updatedAuditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    private AuditLogTable createAuditLogTable(String tableName, boolean 
enableSequenceNumber)
+            throws Exception {
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
         FileIO fileIO = LocalFileIO.create();
 
-        Schema schema =
+        Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .column("pk", DataTypes.INT())
                         .column("pt", DataTypes.INT())
                         .column("col1", DataTypes.INT())
                         .partitionKeys("pt")
                         .primaryKey("pk", "pt")
                         .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
-                        .option("bucket", "1")
-                        .build();
+                        .option("bucket", "1");
+        if (enableSequenceNumber) {
+            
schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"true");
+        }
 
         TableSchema tableSchema =
-                SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath), 
schemaBuilder.build());
         FileStoreTable table =
                 FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
-        Identifier filesTableId =
+
+        writeTestData(table);
+
+        Identifier auditLogTableId =
                 identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
-        auditLogTable = (AuditLogTable) catalog.getTable(filesTableId);
+        return (AuditLogTable) catalog.getTable(auditLogTableId);
+    }
 
+    private void writeTestData(FileStoreTable table) throws Exception {
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
         write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
-        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));

Review Comment:
   This change seems unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to