wzhero1 commented on code in PR #6858:
URL: https://github.com/apache/paimon/pull/6858#discussion_r2652141739
##########
paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java:
##########
@@ -48,11 +56,176 @@
/** Unit tests for {@link AuditLogTable}. */
public class AuditLogTableTest extends TableTestBase {
- private static final String tableName = "MyTable";
+ private FileStoreTable baseTable;
private AuditLogTable auditLogTable;
@BeforeEach
public void before() throws Exception {
+ baseTable = createTableWithData("AuditLogTestTable");
+ Identifier auditLogId =
+ identifier(baseTable.name() + SYSTEM_TABLE_SPLITTER +
AuditLogTable.AUDIT_LOG);
+ auditLogTable = (AuditLogTable) catalog.getTable(auditLogId);
+ }
+
+ @Test
+ public void testBatchReadWithSystemFields() throws Exception {
+ List<InternalRow> result = read(auditLogTable);
+
+ // Verify we get 4 records (after compaction: 1 DELETE, 1
UPDATE_BEFORE, 1 UPDATE_AFTER, 1
+ // INSERT)
+ assertThat(result).hasSize(4);
+
+ // Verify all rows have correct system fields (ROW_KIND at index 0,
SEQUENCE_NUMBER at
+ // index 1)
+ for (InternalRow row : result) {
+ // ROW_KIND should be a BinaryString
+ assertThat(row.getString(0)).isNotNull();
+ // SEQUENCE_NUMBER should be a Long
+ assertThat(row.getLong(1)).isGreaterThanOrEqualTo(0L);
+ }
+
+ // Verify specific records
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ // DELETE: pk=1, pt=1, col1=1, seq=1
+ GenericRow.of(
+
BinaryString.fromString(RowKind.DELETE.shortString()), 1L, 1, 1, 1),
+ // UPDATE_BEFORE: pk=1, pt=2, col1=5, seq=1
+ GenericRow.of(
+
BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()),
+ 1L,
+ 1,
+ 2,
+ 5),
+ // UPDATE_AFTER: pk=1, pt=4, col1=6, seq=0 (new data)
+ GenericRow.of(
+
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+ 0L,
+ 1,
+ 4,
+ 6),
+ // INSERT: pk=2, pt=3, col1=1, seq=0
+ GenericRow.of(
+
BinaryString.fromString(RowKind.INSERT.shortString()),
+ 0L,
+ 2,
+ 3,
+ 1));
+ }
+
+ @Test
+ public void testReadWithPartialFieldsAndOutOfOrder() throws Exception {
+ // Test reading with partial fields in a different order than table
schema
+ // AuditLogTable schema: [rowkind, _SEQUENCE_NUMBER, pk, pt, col1]
+ // We want to read: [pt, rowkind, col1] (out of order, partial fields)
+
+ List<InternalRow> result = readWithCustomProjection(auditLogTable);
+
+ // Verify we get 4 records
+ assertThat(result).hasSize(4);
+
+ // Verify field ordering: [pt, rowkind, col1]
+ for (InternalRow row : result) {
+ // Index 0: pt (INT)
+ assertThat(row.getInt(0)).isGreaterThan(0);
+ // Index 1: rowkind (STRING)
+ assertThat(row.getString(1)).isNotNull();
+ // Index 2: col1 (INT)
+ assertThat(row.getInt(2)).isGreaterThan(0);
+ }
+
+ // Verify specific records with the new field order
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ // pt=1, rowkind=DELETE, col1=1
+ GenericRow.of(1,
BinaryString.fromString(RowKind.DELETE.shortString()), 1),
+ // pt=2, rowkind=UPDATE_BEFORE, col1=5
+ GenericRow.of(
+ 2,
BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 5),
+ // pt=4, rowkind=UPDATE_AFTER, col1=6
+ GenericRow.of(
+ 4,
BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 6),
+ // pt=3, rowkind=INSERT, col1=1
+ GenericRow.of(3,
BinaryString.fromString(RowKind.INSERT.shortString()), 1));
+ }
+
+ /**
+ * Helper method to read with custom field projection in a specific order.
Reads fields [pt,
+ * rowkind, col1] from AuditLogTable.
+ */
+ private List<InternalRow> readWithCustomProjection(AuditLogTable table)
throws Exception {
+ RowType tableRowType = auditLogTable.rowType();
+ List<DataField> customFields =
+ Arrays.asList(
+ tableRowType.getField("pt"),
+ tableRowType.getField("rowkind"),
+ tableRowType.getField("col1"));
+ RowType customReadType = new RowType(customFields);
+
+ ReadBuilder readBuilder =
table.newReadBuilder().withReadType(customReadType);
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new
InternalRowSerializer(customReadType);
+
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+ return rows;
+ }
+
+ @Test
+ public void testAppendOnlyTableWithRowKind() throws Exception {
+ FileStoreTable appendTable =
createAppendOnlyTable("AppendOnlyAuditTest");
+
+ TableWriteImpl<?> write = appendTable.newWrite("user0");
+ StreamTableCommit commit = appendTable.newCommit("user0");
+
+ write.write(GenericRow.of(1, BinaryString.fromString("Alice"), 100L));
+ write.write(GenericRow.of(2, BinaryString.fromString("Bob"), 200L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.close();
+ commit.close();
+
+ AuditLogTable auditLog = new AuditLogTable(appendTable);
+ List<InternalRow> result = read(auditLog);
+
+ // Verify specific records: [rowkind, seq, id, name, amount]
+ // Note: append-only table has null sequence number
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(
+ BinaryString.fromString("+I"),
+ null,
+ 1,
+ BinaryString.fromString("Alice"),
+ 100L),
+ GenericRow.of(
+ BinaryString.fromString("+I"),
+ null,
+ 2,
+ BinaryString.fromString("Bob"),
+ 200L));
+ }
+
+ // ==================== Helper Methods ====================
+
+ /** Creates a FileStoreTable with changelog producer and writes test data.
*/
+ private FileStoreTable createTableWithData(String tableName) throws
Exception {
+ FileStoreTable table = createChangelogTable(tableName);
+
+ // Write test data with different RowKinds
+ write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1)); // Will be
deleted
+ write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
+ write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5)); // Will be
updated
+ write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+ write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1)); // Remains
as insert
+
+ return table;
+ }
+
+ /** Creates a table with changelog producer enabled. */
+ private FileStoreTable createChangelogTable(String tableName) throws
Exception {
Review Comment:
While it's currently used in one place, I prefer keeping this helper method
for single responsibility and better extensibility. It encapsulates the
changelog table creation logic cleanly, making future test additions easier
without duplicating configuration code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]