wuchong commented on code in PR #2105: URL: https://github.com/apache/fluss/pull/2105#discussion_r2631606611
########## fluss-common/src/main/java/org/apache/fluss/metadata/ChangelogImage.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metadata; + +/** + * The changelog image mode for the primary key table. + * + * <p>This enum defines what information is included in the changelog for update operations. It is + * inspired by similar configurations in database systems like MySQL's binlog_row_image and + * PostgreSQL's replica identity. + * + * @since 0.9 + */ +public enum ChangelogImage { + + /** + * Full changelog with both UPDATE_BEFORE and UPDATE_AFTER records. This is the default behavior + * that captures complete information about updates, allowing tracking of previous values. + */ + FULL, + + /** + * WAL mode does not produce UPDATE_BEFORE records. Only INSERT, UPDATE_AFTER (and DELETE if + * allowed) records are emitted. When WAL mode is enabled with default merge engine (no merge + * engine configured) and full row updates (not partial update), an optimization is applied to + * skip looking up old values, and in this case INSERT operations are converted to UPDATE_AFTER + * events, similar to database WAL (Write-Ahead Log) behavior. This mode reduces storage and + * transmission costs but loses the ability to track previous values. + */ + WAL; + + /** Creates a {@link ChangelogImage} from the given string. */ + public static ChangelogImage fromString(String image) { + switch (image.toUpperCase().replace("-", "_")) { Review Comment: Why we need to replace `-` with `_`? ########## fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java: ########## @@ -1044,6 +1044,120 @@ void testAppendDuplicatedKvBatch() throws Exception { assertThat(kvTablet.getKvPreWriteBuffer().getMaxLSN()).isEqualTo(9); } + @Test + void testWalModeChangelogImageNoUpdateBefore() throws Exception { + // WAL mode - no UPDATE_BEFORE. With default merge engine and full row update, + // optimization converts INSERT to UPDATE_AFTER + Map<String, String> config = new HashMap<>(); + config.put("table.changelog.image", "WAL"); + initLogTabletAndKvTablet(DATA1_SCHEMA_PK, config); + RowType rowType = DATA1_SCHEMA_PK.getRowType(); + + // Insert two records + List<KvRecord> kvData1 = + Arrays.asList( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, "v11"}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v21"})); + KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1); + kvTablet.putAsLeader(kvRecordBatch1, null); + long endOffset = logTablet.localLogEndOffset(); + + // Verify inserts produce +U (optimization in WAL mode with default merge engine and full + // row update) + LogRecords actualLogRecords = readLogRecords(); + MemoryLogRecords expectedLogs = + logRecords( + 0L, + Arrays.asList(ChangeType.UPDATE_AFTER, ChangeType.UPDATE_AFTER), + Arrays.asList(new Object[] {1, "v11"}, new Object[] {2, "v21"})); + checkEqual(actualLogRecords, Collections.singletonList(expectedLogs)); + + // Update the records - should only produce UPDATE_AFTER, no UPDATE_BEFORE + List<KvRecord> kvData2 = + Arrays.asList( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, "v12"}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v22"})); + KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2); + kvTablet.putAsLeader(kvRecordBatch2, null); + + // Verify updates only produce +U, not -U + actualLogRecords = readLogRecords(endOffset); + expectedLogs = + logRecords( + endOffset, + Arrays.asList(ChangeType.UPDATE_AFTER, ChangeType.UPDATE_AFTER), + Arrays.asList(new Object[] {1, "v12"}, new Object[] {2, "v22"})); + checkEqual(actualLogRecords, Collections.singletonList(expectedLogs)); + endOffset = logTablet.localLogEndOffset(); + + // Delete one record - should still produce DELETE + List<KvRecord> kvData3 = + Collections.singletonList(kvRecordFactory.ofRecord("k1".getBytes(), null)); + KvRecordBatch kvRecordBatch3 = kvRecordBatchFactory.ofRecords(kvData3); + kvTablet.putAsLeader(kvRecordBatch3, null); + + // Verify delete produces -D + actualLogRecords = readLogRecords(endOffset); + expectedLogs = + logRecords( + endOffset, + Collections.singletonList(ChangeType.DELETE), + Collections.singletonList(new Object[] {1, "v12"})); + checkEqual(actualLogRecords, Collections.singletonList(expectedLogs)); + + // Verify KV store has correct final state + assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes()))).isNotNull(); + assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k2".getBytes()))) + .isEqualTo(valueOf(compactedRow(rowType, new Object[] {2, "v22"}))); + } + + @Test + void testWalModeChangelogImageNoUpdateBeforeWithPartialUpdate() throws Exception { + // WAL mode with partial update - INSERT produces INSERT, UPDATE produces UPDATE_AFTER + // only (no optimization applied) + Map<String, String> config = new HashMap<>(); + config.put("table.changelog.image", "WAL"); + initLogTabletAndKvTablet(DATA2_SCHEMA, config); + RowType rowType = DATA2_SCHEMA.getRowType(); + KvRecordTestUtils.KvRecordFactory data2kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(rowType); + + // Insert with partial columns (column a only) + KvRecordBatch kvRecordBatch1 = + kvRecordBatchFactory.ofRecords( + data2kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, null, null})); + kvTablet.putAsLeader(kvRecordBatch1, new int[] {0}); + + long endOffset = logTablet.localLogEndOffset(); + // Verify insert produces +I (partial update goes through normal path) + LogRecords actualLogRecords = readLogRecords(); + MemoryLogRecords expectedLogs = + logRecords( + rowType, + 0L, + Collections.singletonList(ChangeType.INSERT), + Collections.singletonList(new Object[] {1, null, null})); + checkEqual(actualLogRecords, Collections.singletonList(expectedLogs), rowType); + + // Update with partial columns (column b) + KvRecordBatch kvRecordBatch2 = + kvRecordBatchFactory.ofRecords( + data2kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, "v1", null})); + kvTablet.putAsLeader(kvRecordBatch2, new int[] {0, 1}); + + // Verify update only produces +U (no -U since using WAL mode) + actualLogRecords = readLogRecords(endOffset); + expectedLogs = + logRecords( + rowType, + endOffset, + Collections.singletonList(ChangeType.UPDATE_AFTER), + Collections.singletonList(new Object[] {1, "v1", null})); Review Comment: It would be better to add a partial-update for column `c` as well, and the result should retain value `v1` for column b. Otherwise, it's hard to verify the behavior of partial update, because currently all the non-updated columns are null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
