wzhero1 commented on code in PR #6933:
URL: https://github.com/apache/paimon/pull/6933#discussion_r2680972958


##########
paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java:
##########
@@ -48,60 +48,117 @@
 /** Unit tests for {@link AuditLogTable}. */
 public class AuditLogTableTest extends TableTestBase {
 
-    private static final String tableName = "MyTable";
-    private AuditLogTable auditLogTable;
+    @Test
+    public void testReadAuditLogFromLatest() throws Exception {
+        AuditLogTable auditLogTable = createAuditLogTable("audit_table", 
false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+        List<InternalRow> expectRow = getExpectedResult();
+        List<InternalRow> result = read(auditLogTable);
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithTableOption() throws Exception {
+        AuditLogTable auditLogTable = 
createAuditLogTable("audit_table_with_seq", true);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(auditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithAlterTable() throws Exception {
+        String tableName = "audit_table_alter_seq";
+        // Create table without sequence-number option
+        AuditLogTable auditLogTable = createAuditLogTable(tableName, false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+
+        // Add sequence-number option via alterTable
+        catalog.alterTable(
+                identifier(tableName),
+                SchemaChange.setOption(
+                        
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
+                false);
 
-    @BeforeEach
-    public void before() throws Exception {
+        // Re-fetch the audit_log table to get updated schema
+        Identifier auditLogTableId =
+                identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
+        AuditLogTable updatedAuditLogTable = (AuditLogTable) 
catalog.getTable(auditLogTableId);
+
+        // Verify schema now includes _SEQUENCE_NUMBER
+        assertThat(updatedAuditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(updatedAuditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    private AuditLogTable createAuditLogTable(String tableName, boolean 
enableSequenceNumber)
+            throws Exception {
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
         FileIO fileIO = LocalFileIO.create();
 
-        Schema schema =
+        Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .column("pk", DataTypes.INT())
                         .column("pt", DataTypes.INT())
                         .column("col1", DataTypes.INT())
                         .partitionKeys("pt")
                         .primaryKey("pk", "pt")
                         .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
-                        .option("bucket", "1")
-                        .build();
+                        .option("bucket", "1");
+        if (enableSequenceNumber) {
+            
schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"true");
+        }
 
         TableSchema tableSchema =
-                SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath), 
schemaBuilder.build());
         FileStoreTable table =
                 FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
-        Identifier filesTableId =
+
+        writeTestData(table);
+
+        Identifier auditLogTableId =
                 identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
-        auditLogTable = (AuditLogTable) catalog.getTable(filesTableId);
+        return (AuditLogTable) catalog.getTable(auditLogTableId);
+    }
 
+    private void writeTestData(FileStoreTable table) throws Exception {
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
         write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
-        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));

Review Comment:
   The original test case is incorrect - the primary keys for 
`RowKind.UPDATE_BEFORE` and `RowKind.UPDATE_AFTER` are inconsistent. This 
scenario should not occur in normal circumstances.



##########
paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java:
##########
@@ -48,60 +48,117 @@
 /** Unit tests for {@link AuditLogTable}. */
 public class AuditLogTableTest extends TableTestBase {
 
-    private static final String tableName = "MyTable";
-    private AuditLogTable auditLogTable;
+    @Test
+    public void testReadAuditLogFromLatest() throws Exception {
+        AuditLogTable auditLogTable = createAuditLogTable("audit_table", 
false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+        List<InternalRow> expectRow = getExpectedResult();
+        List<InternalRow> result = read(auditLogTable);
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithTableOption() throws Exception {
+        AuditLogTable auditLogTable = 
createAuditLogTable("audit_table_with_seq", true);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(auditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    @Test
+    public void testReadSequenceNumberWithAlterTable() throws Exception {
+        String tableName = "audit_table_alter_seq";
+        // Create table without sequence-number option
+        AuditLogTable auditLogTable = createAuditLogTable(tableName, false);
+        assertThat(auditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "pk", "pt", "col1");
+
+        // Add sequence-number option via alterTable
+        catalog.alterTable(
+                identifier(tableName),
+                SchemaChange.setOption(
+                        
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
+                false);
 
-    @BeforeEach
-    public void before() throws Exception {
+        // Re-fetch the audit_log table to get updated schema
+        Identifier auditLogTableId =
+                identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
+        AuditLogTable updatedAuditLogTable = (AuditLogTable) 
catalog.getTable(auditLogTableId);
+
+        // Verify schema now includes _SEQUENCE_NUMBER
+        assertThat(updatedAuditLogTable.rowType().getFieldNames())
+                .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", 
"col1");
+
+        List<InternalRow> result = read(updatedAuditLogTable);
+        List<InternalRow> expectRow = getExpectedResultWithSequenceNumber();
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+    }
+
+    private AuditLogTable createAuditLogTable(String tableName, boolean 
enableSequenceNumber)
+            throws Exception {
         Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
         FileIO fileIO = LocalFileIO.create();
 
-        Schema schema =
+        Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .column("pk", DataTypes.INT())
                         .column("pt", DataTypes.INT())
                         .column("col1", DataTypes.INT())
                         .partitionKeys("pt")
                         .primaryKey("pk", "pt")
                         .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
-                        .option("bucket", "1")
-                        .build();
+                        .option("bucket", "1");
+        if (enableSequenceNumber) {
+            
schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), 
"true");
+        }
 
         TableSchema tableSchema =
-                SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath), 
schemaBuilder.build());
         FileStoreTable table =
                 FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
-        Identifier filesTableId =
+
+        writeTestData(table);
+
+        Identifier auditLogTableId =
                 identifier(tableName + SYSTEM_TABLE_SPLITTER + 
AuditLogTable.AUDIT_LOG);
-        auditLogTable = (AuditLogTable) catalog.getTable(filesTableId);
+        return (AuditLogTable) catalog.getTable(auditLogTableId);
+    }
 
+    private void writeTestData(FileStoreTable table) throws Exception {
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
         write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
         write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
-        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+        write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
         write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
     }
 
-    @Test
-    public void testReadAuditLogFromLatest() throws Exception {
-        List<InternalRow> expectRow = getExpectedResult();
-        List<InternalRow> result = read(auditLogTable);
-        assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
-    }
-
     private List<InternalRow> getExpectedResult() {
         List<InternalRow> expectedRow = new ArrayList<>();
         expectedRow.add(
                 
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1));
         expectedRow.add(
                 GenericRow.of(
-                        
BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5));
+                        
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6));

Review Comment:
   The original test case is incorrect - the primary keys for 
`RowKind.UPDATE_BEFORE` and `RowKind.UPDATE_AFTER` are inconsistent. This 
scenario should not occur in normal circumstances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to