This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a908690ed [common] Support COMPACTED format for log tables (#1605)
a908690ed is described below

commit a908690ed465d1a4349feebb9b92f80c61a41053
Author: Giannis Polyzos <[email protected]>
AuthorDate: Thu Dec 25 13:24:19 2025 +0200

    [common] Support COMPACTED format for log tables (#1605)
    
    
    Co-authored-by: Jark Wu <[email protected]>
---
 .../client/table/writer/AppendWriterImpl.java      |  25 +++
 .../client/table/writer/UpsertWriterImpl.java      |  11 +-
 ...iteBatch.java => AbstractRowLogWriteBatch.java} |  68 ++++---
 .../fluss/client/write/CompactedLogWriteBatch.java |  64 +++++++
 .../fluss/client/write/IndexedLogWriteBatch.java   | 105 ++---------
 .../fluss/client/write/RecordAccumulator.java      | 109 +++++++-----
 .../org/apache/fluss/client/write/WriteFormat.java |  47 ++++-
 .../org/apache/fluss/client/write/WriteRecord.java |  39 ++++-
 .../fluss/client/table/FlussTableITCase.java       | 195 ++++++++++++++++++++-
 ...chTest.java => CompactedLogWriteBatchTest.java} |  61 +++----
 .../client/write/IndexedLogWriteBatchTest.java     |   9 +-
 .../fluss/client/write/KvWriteBatchTest.java       |   1 +
 .../record/MemoryLogRecordsCompactedBuilder.java   |   3 +-
 .../record/MemoryLogRecordsIndexedBuilder.java     |   2 +-
 ...uilder.java => MemoryLogRecordsRowBuilder.java} |   4 +-
 15 files changed, 519 insertions(+), 224 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
index d702e9621..b3a3f8131 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
@@ -26,9 +26,12 @@ import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.InternalRow.FieldGetter;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.encode.CompactedRowEncoder;
 import org.apache.fluss.row.encode.IndexedRowEncoder;
 import org.apache.fluss.row.encode.KeyEncoder;
 import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
@@ -44,6 +47,7 @@ class AppendWriterImpl extends AbstractTableWriter implements 
AppendWriter {
 
     private final LogFormat logFormat;
     private final IndexedRowEncoder indexedRowEncoder;
+    private final CompactedRowEncoder compactedRowEncoder;
     private final FieldGetter[] fieldGetters;
     private final TableInfo tableInfo;
 
@@ -58,8 +62,12 @@ class AppendWriterImpl extends AbstractTableWriter 
implements AppendWriter {
             this.bucketKeyEncoder = KeyEncoder.of(rowType, bucketKeys, 
lakeFormat);
         }
 
+        DataType[] fieldDataTypes =
+                tableInfo.getSchema().getRowType().getChildren().toArray(new 
DataType[0]);
+
         this.logFormat = tableInfo.getTableConfig().getLogFormat();
         this.indexedRowEncoder = new IndexedRowEncoder(tableInfo.getRowType());
+        this.compactedRowEncoder = new CompactedRowEncoder(fieldDataTypes);
         this.fieldGetters = 
InternalRow.createFieldGetters(tableInfo.getRowType());
         this.tableInfo = tableInfo;
     }
@@ -80,6 +88,11 @@ class AppendWriterImpl extends AbstractTableWriter 
implements AppendWriter {
         if (logFormat == LogFormat.INDEXED) {
             IndexedRow indexedRow = encodeIndexedRow(row);
             record = WriteRecord.forIndexedAppend(tableInfo, physicalPath, 
indexedRow, bucketKey);
+        } else if (logFormat == LogFormat.COMPACTED) {
+            CompactedRow compactedRow = encodeCompactedRow(row);
+            record =
+                    WriteRecord.forCompactedAppend(
+                            tableInfo, physicalPath, compactedRow, bucketKey);
         } else {
             // ARROW format supports general internal row
             record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, 
bucketKey);
@@ -87,6 +100,18 @@ class AppendWriterImpl extends AbstractTableWriter 
implements AppendWriter {
         return send(record).thenApply(ignored -> APPEND_SUCCESS);
     }
 
+    private CompactedRow encodeCompactedRow(InternalRow row) {
+        if (row instanceof CompactedRow) {
+            return (CompactedRow) row;
+        }
+
+        compactedRowEncoder.startNewRow();
+        for (int i = 0; i < fieldCount; i++) {
+            compactedRowEncoder.encodeField(i, 
fieldGetters[i].getFieldOrNull(row));
+        }
+        return compactedRowEncoder.finishRow();
+    }
+
     private IndexedRow encodeIndexedRow(InternalRow row) {
         if (row instanceof IndexedRow) {
             return (IndexedRow) row;
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index 39f65592c..20e5bcc5e 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.client.table.writer;
 
+import org.apache.fluss.client.write.WriteFormat;
 import org.apache.fluss.client.write.WriteRecord;
 import org.apache.fluss.client.write.WriterClient;
 import org.apache.fluss.metadata.DataLakeFormat;
@@ -50,6 +51,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements 
UpsertWriter {
     private final KeyEncoder bucketKeyEncoder;
 
     private final KvFormat kvFormat;
+    private final WriteFormat writeFormat;
     private final RowEncoder rowEncoder;
     private final FieldGetter[] fieldGetters;
     private final TableInfo tableInfo;
@@ -78,6 +80,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements 
UpsertWriter {
                         : KeyEncoder.of(rowType, tableInfo.getBucketKeys(), 
lakeFormat);
 
         this.kvFormat = tableInfo.getTableConfig().getKvFormat();
+        this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
         this.rowEncoder = RowEncoder.create(kvFormat, rowType);
         this.fieldGetters = InternalRow.createFieldGetters(rowType);
         this.tableInfo = tableInfo;
@@ -164,6 +167,7 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
                         encodeRow(row),
                         key,
                         bucketKey,
+                        writeFormat,
                         targetColumns);
         return send(record).thenApply(ignored -> UPSERT_SUCCESS);
     }
@@ -182,7 +186,12 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
                 bucketKeyEncoder == primaryKeyEncoder ? key : 
bucketKeyEncoder.encodeKey(row);
         WriteRecord record =
                 WriteRecord.forDelete(
-                        tableInfo, getPhysicalPath(row), key, bucketKey, 
targetColumns);
+                        tableInfo,
+                        getPhysicalPath(row),
+                        key,
+                        bucketKey,
+                        writeFormat,
+                        targetColumns);
         return send(record).thenApply(ignored -> DELETE_SUCCESS);
     }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
similarity index 70%
copy from 
fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
copy to 
fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
index fb9655658..3c61cce82 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
@@ -17,18 +17,14 @@
 
 package org.apache.fluss.client.write;
 
-import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.memory.AbstractPagedOutputView;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
+import org.apache.fluss.record.MemoryLogRecordsRowBuilder;
 import org.apache.fluss.record.bytesview.BytesView;
-import org.apache.fluss.row.indexed.IndexedRow;
-import org.apache.fluss.rpc.messages.ProduceLogRequest;
-
-import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fluss.row.InternalRow;
 
 import java.io.IOException;
 import java.util.List;
@@ -37,55 +33,54 @@ import static 
org.apache.fluss.utils.Preconditions.checkArgument;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /**
- * A batch of log records managed in INDEXED format that is or will be sent to 
server by {@link
- * ProduceLogRequest}.
- *
- * <p>This class is not thread safe and external synchronization must be used 
when modifying it.
+ * Abstract base class to deduplicate logic for row-based log write batches 
backed by in-memory
+ * builders. Concrete subclasses only need to provide a row-type 
validator/caster and a
+ * RecordsBuilderAdapter implementation.
  */
-@NotThreadSafe
-@Internal
-public final class IndexedLogWriteBatch extends WriteBatch {
+abstract class AbstractRowLogWriteBatch<R> extends WriteBatch {
+
     private final AbstractPagedOutputView outputView;
-    private final MemoryLogRecordsIndexedBuilder recordsBuilder;
+    private final MemoryLogRecordsRowBuilder<R> recordsBuilder;
+    private final String buildErrorMessage;
 
-    public IndexedLogWriteBatch(
+    protected AbstractRowLogWriteBatch(
             int bucketId,
             PhysicalTablePath physicalTablePath,
-            int schemaId,
-            int writeLimit,
+            long createdMs,
             AbstractPagedOutputView outputView,
-            long createdMs) {
+            MemoryLogRecordsRowBuilder<R> recordsBuilder,
+            String buildErrorMessage) {
         super(bucketId, physicalTablePath, createdMs);
         this.outputView = outputView;
-        this.recordsBuilder =
-                MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, 
outputView, true);
-    }
-
-    @Override
-    public boolean isLogBatch() {
-        return true;
+        this.recordsBuilder = recordsBuilder;
+        this.buildErrorMessage = buildErrorMessage;
     }
 
     @Override
     public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) 
throws Exception {
         checkNotNull(callback, "write callback must be not null");
-        checkNotNull(writeRecord.getRow(), "row must not be null for log 
record");
+        InternalRow rowObj = writeRecord.getRow();
+        checkNotNull(rowObj, "row must not be null for log record");
         checkArgument(writeRecord.getKey() == null, "key must be null for log 
record");
         checkArgument(
                 writeRecord.getTargetColumns() == null,
                 "target columns must be null for log record");
-        checkArgument(
-                writeRecord.getRow() instanceof IndexedRow,
-                "row must not be IndexRow for indexed log table");
-        IndexedRow row = (IndexedRow) writeRecord.getRow();
+
+        R row = requireAndCastRow(rowObj);
         if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
             return false;
-        } else {
-            recordsBuilder.append(ChangeType.APPEND_ONLY, row);
-            recordCount++;
-            callbacks.add(callback);
-            return true;
         }
+        recordsBuilder.append(ChangeType.APPEND_ONLY, row);
+        recordCount++;
+        callbacks.add(callback);
+        return true;
+    }
+
+    protected abstract R requireAndCastRow(InternalRow row);
+
+    @Override
+    public boolean isLogBatch() {
+        return true;
     }
 
     @Override
@@ -93,7 +88,7 @@ public final class IndexedLogWriteBatch extends WriteBatch {
         try {
             return recordsBuilder.build();
         } catch (IOException e) {
-            throw new FlussRuntimeException("Failed to build indexed log 
record batch.", e);
+            throw new FlussRuntimeException(buildErrorMessage, e);
         }
     }
 
@@ -133,6 +128,7 @@ public final class IndexedLogWriteBatch extends WriteBatch {
         recordsBuilder.abort();
     }
 
+    @Override
     public void resetWriterState(long writerId, int batchSequence) {
         super.resetWriterState(writerId, batchSequence);
         recordsBuilder.resetWriterState(writerId, batchSequence);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
new file mode 100644
index 000000000..19366a4dc
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.write;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.memory.AbstractPagedOutputView;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.rpc.messages.ProduceLogRequest;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * A batch of log records managed in COMPACTED format that is or will be sent 
to server by {@link
+ * ProduceLogRequest}.
+ *
+ * <p>This class is not thread safe and external synchronization must be used 
when modifying it.
+ */
+@NotThreadSafe
+@Internal
+public final class CompactedLogWriteBatch extends 
AbstractRowLogWriteBatch<CompactedRow> {
+
+    public CompactedLogWriteBatch(
+            int bucketId,
+            PhysicalTablePath physicalTablePath,
+            int schemaId,
+            int writeLimit,
+            AbstractPagedOutputView outputView,
+            long createdMs) {
+        super(
+                bucketId,
+                physicalTablePath,
+                createdMs,
+                outputView,
+                MemoryLogRecordsCompactedBuilder.builder(schemaId, writeLimit, 
outputView, true),
+                "Failed to build compacted log record batch.");
+    }
+
+    @Override
+    protected CompactedRow requireAndCastRow(InternalRow row) {
+        checkArgument(
+                row instanceof CompactedRow, "row must be CompactedRow for 
compacted log table");
+        return (CompactedRow) row;
+    }
+}
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
index fb9655658..c70dd83a8 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
@@ -18,23 +18,15 @@
 package org.apache.fluss.client.write;
 
 import org.apache.fluss.annotation.Internal;
-import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.memory.AbstractPagedOutputView;
-import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
-import org.apache.fluss.record.bytesview.BytesView;
 import org.apache.fluss.row.indexed.IndexedRow;
 import org.apache.fluss.rpc.messages.ProduceLogRequest;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.IOException;
-import java.util.List;
-
 import static org.apache.fluss.utils.Preconditions.checkArgument;
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /**
  * A batch of log records managed in INDEXED format that is or will be sent to 
server by {@link
@@ -44,9 +36,7 @@ import static 
org.apache.fluss.utils.Preconditions.checkNotNull;
  */
 @NotThreadSafe
 @Internal
-public final class IndexedLogWriteBatch extends WriteBatch {
-    private final AbstractPagedOutputView outputView;
-    private final MemoryLogRecordsIndexedBuilder recordsBuilder;
+public final class IndexedLogWriteBatch extends 
AbstractRowLogWriteBatch<IndexedRow> {
 
     public IndexedLogWriteBatch(
             int bucketId,
@@ -55,91 +45,18 @@ public final class IndexedLogWriteBatch extends WriteBatch {
             int writeLimit,
             AbstractPagedOutputView outputView,
             long createdMs) {
-        super(bucketId, physicalTablePath, createdMs);
-        this.outputView = outputView;
-        this.recordsBuilder =
-                MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, 
outputView, true);
-    }
-
-    @Override
-    public boolean isLogBatch() {
-        return true;
-    }
-
-    @Override
-    public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) 
throws Exception {
-        checkNotNull(callback, "write callback must be not null");
-        checkNotNull(writeRecord.getRow(), "row must not be null for log 
record");
-        checkArgument(writeRecord.getKey() == null, "key must be null for log 
record");
-        checkArgument(
-                writeRecord.getTargetColumns() == null,
-                "target columns must be null for log record");
-        checkArgument(
-                writeRecord.getRow() instanceof IndexedRow,
-                "row must not be IndexRow for indexed log table");
-        IndexedRow row = (IndexedRow) writeRecord.getRow();
-        if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
-            return false;
-        } else {
-            recordsBuilder.append(ChangeType.APPEND_ONLY, row);
-            recordCount++;
-            callbacks.add(callback);
-            return true;
-        }
-    }
-
-    @Override
-    public BytesView build() {
-        try {
-            return recordsBuilder.build();
-        } catch (IOException e) {
-            throw new FlussRuntimeException("Failed to build indexed log 
record batch.", e);
-        }
-    }
-
-    @Override
-    public boolean isClosed() {
-        return recordsBuilder.isClosed();
-    }
-
-    @Override
-    public void close() throws Exception {
-        recordsBuilder.close();
-        reopened = false;
-    }
-
-    @Override
-    public List<MemorySegment> pooledMemorySegments() {
-        return outputView.allocatedPooledSegments();
-    }
-
-    @Override
-    public void setWriterState(long writerId, int batchSequence) {
-        recordsBuilder.setWriterState(writerId, batchSequence);
-    }
-
-    @Override
-    public long writerId() {
-        return recordsBuilder.writerId();
-    }
-
-    @Override
-    public int batchSequence() {
-        return recordsBuilder.batchSequence();
-    }
-
-    @Override
-    public void abortRecordAppends() {
-        recordsBuilder.abort();
-    }
-
-    public void resetWriterState(long writerId, int batchSequence) {
-        super.resetWriterState(writerId, batchSequence);
-        recordsBuilder.resetWriterState(writerId, batchSequence);
+        super(
+                bucketId,
+                physicalTablePath,
+                createdMs,
+                outputView,
+                MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, 
outputView, true),
+                "Failed to build indexed log record batch.");
     }
 
     @Override
-    public int estimatedSizeInBytes() {
-        return recordsBuilder.getSizeInBytes();
+    protected IndexedRow requireAndCastRow(org.apache.fluss.row.InternalRow 
row) {
+        checkArgument(row instanceof IndexedRow, "row must be IndexRow for 
indexed log table");
+        return (IndexedRow) row;
     }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 3a282ba92..0c834a49f 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -581,45 +581,15 @@ public final class RecordAccumulator {
         PreAllocatedPagedOutputView outputView = new 
PreAllocatedPagedOutputView(segments);
         int schemaId = tableInfo.getSchemaId();
         WriteFormat writeFormat = writeRecord.getWriteFormat();
-        // If the table is kv table we need to create a kv batch, otherwise we 
create a log batch.
-        final WriteBatch batch;
-        if (writeFormat == WriteFormat.KV) {
-            batch =
-                    new KvWriteBatch(
-                            bucketId,
-                            physicalTablePath,
-                            tableInfo.getSchemaId(),
-                            tableInfo.getTableConfig().getKvFormat(),
-                            outputView.getPreAllocatedSize(),
-                            outputView,
-                            writeRecord.getTargetColumns(),
-                            clock.milliseconds());
-        } else if (writeFormat == WriteFormat.ARROW_LOG) {
-            ArrowWriter arrowWriter =
-                    arrowWriterPool.getOrCreateWriter(
-                            tableInfo.getTableId(),
-                            schemaId,
-                            outputView.getPreAllocatedSize(),
-                            tableInfo.getRowType(),
-                            
tableInfo.getTableConfig().getArrowCompressionInfo());
-            batch =
-                    new ArrowLogWriteBatch(
-                            bucketId,
-                            physicalTablePath,
-                            tableInfo.getSchemaId(),
-                            arrowWriter,
-                            outputView,
-                            clock.milliseconds());
-        } else {
-            batch =
-                    new IndexedLogWriteBatch(
-                            bucketId,
-                            physicalTablePath,
-                            tableInfo.getSchemaId(),
-                            outputView.getPreAllocatedSize(),
-                            outputView,
-                            clock.milliseconds());
-        }
+        final WriteBatch batch =
+                createWriteBatch(
+                        writeRecord,
+                        bucketId,
+                        tableInfo,
+                        writeFormat,
+                        physicalTablePath,
+                        outputView,
+                        schemaId);
 
         batch.tryAppend(writeRecord, callback);
         deque.addLast(batch);
@@ -627,6 +597,67 @@ public final class RecordAccumulator {
         return new RecordAppendResult(deque.size() > 1 || batch.isClosed(), 
true, false);
     }
 
+    private WriteBatch createWriteBatch(
+            WriteRecord writeRecord,
+            int bucketId,
+            TableInfo tableInfo,
+            WriteFormat writeFormat,
+            PhysicalTablePath physicalTablePath,
+            PreAllocatedPagedOutputView outputView,
+            int schemaId) {
+        // If the table is kv table we need to create a kv batch, otherwise we 
create a log batch.
+        switch (writeFormat) {
+            case COMPACTED_KV:
+            case INDEXED_KV:
+                return new KvWriteBatch(
+                        bucketId,
+                        physicalTablePath,
+                        tableInfo.getSchemaId(),
+                        writeFormat.toKvFormat(),
+                        outputView.getPreAllocatedSize(),
+                        outputView,
+                        writeRecord.getTargetColumns(),
+                        clock.milliseconds());
+
+            case ARROW_LOG:
+                ArrowWriter arrowWriter =
+                        arrowWriterPool.getOrCreateWriter(
+                                tableInfo.getTableId(),
+                                schemaId,
+                                outputView.getPreAllocatedSize(),
+                                tableInfo.getRowType(),
+                                
tableInfo.getTableConfig().getArrowCompressionInfo());
+                return new ArrowLogWriteBatch(
+                        bucketId,
+                        physicalTablePath,
+                        tableInfo.getSchemaId(),
+                        arrowWriter,
+                        outputView,
+                        clock.milliseconds());
+
+            case COMPACTED_LOG:
+                return new CompactedLogWriteBatch(
+                        bucketId,
+                        physicalTablePath,
+                        schemaId,
+                        outputView.getPreAllocatedSize(),
+                        outputView,
+                        clock.milliseconds());
+
+            case INDEXED_LOG:
+                return new IndexedLogWriteBatch(
+                        bucketId,
+                        physicalTablePath,
+                        tableInfo.getSchemaId(),
+                        outputView.getPreAllocatedSize(),
+                        outputView,
+                        clock.milliseconds());
+
+            default:
+                throw new UnsupportedOperationException("Unsupported write 
format: " + writeFormat);
+        }
+    }
+
     private RecordAppendResult tryAppend(
             WriteRecord writeRecord, WriteCallback callback, Deque<WriteBatch> 
deque)
             throws Exception {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java
index 669c6807e..24f07c9ce 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java
@@ -18,11 +18,52 @@
 package org.apache.fluss.client.write;
 
 import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.metadata.KvFormat;
 
 /** The format of the write record. */
 @Internal
 public enum WriteFormat {
-    ARROW_LOG,
-    INDEXED_LOG,
-    KV
+    ARROW_LOG(true),
+    INDEXED_LOG(true),
+    COMPACTED_LOG(true),
+    INDEXED_KV(false),
+    COMPACTED_KV(false);
+
+    private final boolean isLog;
+
+    WriteFormat(boolean isLog) {
+        this.isLog = isLog;
+    }
+
+    public boolean isLog() {
+        return isLog;
+    }
+
+    public boolean isKv() {
+        return !isLog;
+    }
+
+    /** Converts this {@link WriteFormat} to a {@link KvFormat}. */
+    public KvFormat toKvFormat() {
+        switch (this) {
+            case INDEXED_KV:
+                return KvFormat.INDEXED;
+            case COMPACTED_KV:
+                return KvFormat.COMPACTED;
+            default:
+                throw new IllegalArgumentException("WriteFormat " + this + " 
is not a KvFormat");
+        }
+    }
+
+    /** Converts a {@link KvFormat} to a {@link WriteFormat}. */
+    public static WriteFormat fromKvFormat(KvFormat kvFormat) {
+        switch (kvFormat) {
+            case INDEXED:
+                return INDEXED_KV;
+            case COMPACTED:
+                return COMPACTED_KV;
+            default:
+                throw new IllegalArgumentException("Unknown KvFormat: " + 
kvFormat);
+        }
+    }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
index 68a3fe8a2..9265c5f77 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
@@ -20,17 +20,20 @@ package org.apache.fluss.client.write;
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.record.CompactedLogRecord;
 import org.apache.fluss.record.DefaultKvRecord;
-import org.apache.fluss.record.DefaultKvRecordBatch;
 import org.apache.fluss.record.IndexedLogRecord;
 import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.compacted.CompactedRow;
 import org.apache.fluss.row.indexed.IndexedRow;
 
 import javax.annotation.Nullable;
 
+import static 
org.apache.fluss.record.DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
 import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
 import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /**
@@ -47,19 +50,20 @@ public final class WriteRecord {
             BinaryRow row,
             byte[] key,
             byte[] bucketKey,
+            WriteFormat writeFormat,
             @Nullable int[] targetColumns) {
         checkNotNull(row, "row must not be null");
         checkNotNull(key, "key must not be null");
         checkNotNull(bucketKey, "bucketKey must not be null");
-        int estimatedSizeInBytes =
-                DefaultKvRecord.sizeOf(key, row) + 
DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
+        checkArgument(writeFormat.isKv(), "writeFormat must be a KV format");
+        int estimatedSizeInBytes = DefaultKvRecord.sizeOf(key, row) + 
RECORD_BATCH_HEADER_SIZE;
         return new WriteRecord(
                 tableInfo,
                 tablePath,
                 key,
                 bucketKey,
                 row,
-                WriteFormat.KV,
+                writeFormat,
                 targetColumns,
                 estimatedSizeInBytes);
     }
@@ -70,18 +74,19 @@ public final class WriteRecord {
             PhysicalTablePath tablePath,
             byte[] key,
             byte[] bucketKey,
+            WriteFormat writeFormat,
             @Nullable int[] targetColumns) {
         checkNotNull(key, "key must not be null");
         checkNotNull(bucketKey, "key must not be null");
-        int estimatedSizeInBytes =
-                DefaultKvRecord.sizeOf(key, null) + 
DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
+        checkArgument(writeFormat.isKv(), "writeFormat must be a KV format");
+        int estimatedSizeInBytes = DefaultKvRecord.sizeOf(key, null) + 
RECORD_BATCH_HEADER_SIZE;
         return new WriteRecord(
                 tableInfo,
                 tablePath,
                 key,
                 bucketKey,
                 null,
-                WriteFormat.KV,
+                writeFormat,
                 targetColumns,
                 estimatedSizeInBytes);
     }
@@ -127,6 +132,26 @@ public final class WriteRecord {
                 estimatedSizeInBytes);
     }
 
+    /** Creates a write record for append operation for Compacted format. */
+    public static WriteRecord forCompactedAppend(
+            TableInfo tableInfo,
+            PhysicalTablePath tablePath,
+            CompactedRow row,
+            @Nullable byte[] bucketKey) {
+        checkNotNull(row);
+        int estimatedSizeInBytes =
+                CompactedLogRecord.sizeOf(row) + 
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
+        return new WriteRecord(
+                tableInfo,
+                tablePath,
+                null,
+                bucketKey,
+                row,
+                WriteFormat.COMPACTED_LOG,
+                null,
+                estimatedSizeInBytes);
+    }
+
     // 
------------------------------------------------------------------------------------------
 
     private final PhysicalTablePath physicalTablePath;
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index 0cc2f7f10..27b6690a9 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -742,7 +742,7 @@ class FlussTableITCase extends ClientToServerITCaseBase {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"INDEXED", "ARROW"})
+    @ValueSource(strings = {"INDEXED", "ARROW", "COMPACTED"})
     void testAppendAndPoll(String format) throws Exception {
         verifyAppendOrPut(true, format, null);
     }
@@ -1417,4 +1417,197 @@ class FlussTableITCase extends ClientToServerITCaseBase 
{
                             Collections.singletonMap("client.fs.test.key", 
"fs_test_value"));
         }
     }
+
+    // ---------------------- PK with COMPACTED log tests 
----------------------
+    @Test
+    void testPkUpsertAndPollWithCompactedLog() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.STRING())
+                        .column("d", DataTypes.BIGINT())
+                        .primaryKey("a")
+                        .build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .kvFormat(KvFormat.COMPACTED)
+                        .logFormat(LogFormat.COMPACTED)
+                        .build();
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_pk_compacted_upsert_poll");
+        createTable(tablePath, tableDescriptor, false);
+
+        int expectedSize = 30;
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+            for (int i = 0; i < expectedSize; i++) {
+                String value = i % 2 == 0 ? "hello, friend" + i : null;
+                GenericRow r = row(i, 100, value, i * 10L);
+                upsertWriter.upsert(r);
+                if (i % 10 == 0) {
+                    upsertWriter.flush();
+                }
+            }
+            upsertWriter.flush();
+
+            // normal scan
+            try (LogScanner logScanner = createLogScanner(table)) {
+                subscribeFromBeginning(logScanner, table);
+                int count = 0;
+                while (count < expectedSize) {
+                    ScanRecords scanRecords = 
logScanner.poll(Duration.ofSeconds(1));
+                    for (ScanRecord scanRecord : scanRecords) {
+                        
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
+                        InternalRow rr = scanRecord.getRow();
+                        assertThat(rr.getFieldCount()).isEqualTo(4);
+                        assertThat(rr.getInt(0)).isEqualTo(count);
+                        assertThat(rr.getInt(1)).isEqualTo(100);
+                        if (count % 2 == 0) {
+                            assertThat(rr.getString(2).toString())
+                                    .isEqualTo("hello, friend" + count);
+                        } else {
+                            assertThat(rr.isNullAt(2)).isTrue();
+                        }
+                        assertThat(rr.getLong(3)).isEqualTo(count * 10L);
+                        count++;
+                    }
+                }
+                assertThat(count).isEqualTo(expectedSize);
+            }
+
+            // Creating a projected log scanner for COMPACTED should work
+            try (LogScanner scanner = createLogScanner(table, new int[] {0, 
2})) {
+                subscribeFromBeginning(scanner, table);
+                int count = 0;
+                while (count < expectedSize) {
+                    ScanRecords records = scanner.poll(Duration.ofSeconds(1));
+                    for (ScanRecord record : records) {
+                        InternalRow row = record.getRow();
+                        assertThat(row.getFieldCount()).isEqualTo(2);
+                        assertThat(row.getInt(0)).isEqualTo(count);
+                        if (count % 2 == 0) {
+                            assertThat(row.getString(1).toString())
+                                    .isEqualTo("hello, friend" + count);
+                        } else {
+                            assertThat(row.isNullAt(1)).isTrue();
+                        }
+                        count++;
+                    }
+                }
+                assertThat(count).isEqualTo(expectedSize);
+            }
+        }
+    }
+
+    @Test
+    void testPkUpdateAndDeleteWithCompactedLog() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .kvFormat(KvFormat.COMPACTED)
+                        .logFormat(LogFormat.COMPACTED)
+                        .build();
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_pk_compacted_update_delete");
+        createTable(tablePath, tableDescriptor, false);
+
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+            // initial insert
+            upsertWriter.upsert(row(1, 10));
+            upsertWriter.flush();
+            // update same key
+            upsertWriter.upsert(row(1, 20));
+            upsertWriter.flush();
+            // delete the key
+            upsertWriter.delete(row(1, 20));
+            upsertWriter.flush();
+
+            LogScanner scanner = createLogScanner(table);
+            subscribeFromBeginning(scanner, table);
+            // Expect: +I(1,10), -U(1,10), +U(1,20), -D(1,20)
+            ChangeType[] expected = {
+                ChangeType.INSERT,
+                ChangeType.UPDATE_BEFORE,
+                ChangeType.UPDATE_AFTER,
+                ChangeType.DELETE
+            };
+            int seen = 0;
+            while (seen < expected.length) {
+                ScanRecords recs = scanner.poll(Duration.ofSeconds(1));
+                for (ScanRecord r : recs) {
+                    assertThat(r.getChangeType()).isEqualTo(expected[seen]);
+                    InternalRow row = r.getRow();
+                    assertThat(row.getInt(0)).isEqualTo(1);
+                    // value field present
+                    if (expected[seen] == ChangeType.UPDATE_AFTER
+                            || expected[seen] == ChangeType.DELETE) {
+                        assertThat(row.getInt(1)).isEqualTo(20);
+                    } else {
+                        assertThat(row.getInt(1)).isEqualTo(10);
+                    }
+                    seen++;
+                }
+            }
+            assertThat(seen).isEqualTo(expected.length);
+            scanner.close();
+        }
+    }
+
+    @Test
+    void testPkCompactedPollFromLatestNoRecords() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .build();
+        TableDescriptor td =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .kvFormat(KvFormat.COMPACTED)
+                        .logFormat(LogFormat.COMPACTED)
+                        .build();
+        TablePath path = TablePath.of("test_db_1", "test_pk_compacted_latest");
+        createTable(path, td, false);
+
+        try (Table table = conn.getTable(path)) {
+            LogScanner scanner = createLogScanner(table);
+            subscribeFromLatestOffset(path, null, null, table, scanner, admin);
+            // Now write a few rows and ensure only these are seen
+            UpsertWriter upsert = table.newUpsert().createWriter();
+            for (int i = 0; i < 5; i++) {
+                upsert.upsert(row(i, i));
+            }
+            upsert.flush();
+
+            int seen = 0;
+            while (seen < 5) {
+                ScanRecords recs = scanner.poll(Duration.ofSeconds(1));
+                for (ScanRecord r : recs) {
+                    assertThat(r.getChangeType()).isEqualTo(ChangeType.INSERT);
+                    assertThat(r.getRow().getInt(0)).isBetween(0, 4);
+                    seen++;
+                }
+            }
+
+            // delete non-existent key
+            upsert.delete(row(42, 0));
+            upsert.flush();
+            // poll a few times to ensure no accidental records
+            int total = 0;
+            for (int i = 0; i < 3; i++) {
+                total += scanner.poll(Duration.ofSeconds(1)).count();
+            }
+            assertThat(total).isEqualTo(0);
+
+            scanner.close();
+        }
+    }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
similarity index 83%
copy from 
fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
copy to 
fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
index db39493bd..cbd6680a7 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
@@ -19,17 +19,15 @@ package org.apache.fluss.client.write;
 
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.PreAllocatedPagedOutputView;
-import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.IndexedLogRecord;
+import org.apache.fluss.record.CompactedLogRecord;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.record.MemoryLogRecords;
-import org.apache.fluss.record.TestingSchemaGetter;
 import org.apache.fluss.record.bytesview.BytesView;
-import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.row.compacted.CompactedRow;
 import org.apache.fluss.utils.CloseableIterator;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -45,40 +43,38 @@ import static 
org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
 import static 
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
-import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
-import static org.apache.fluss.testutils.DataTestUtils.indexedRow;
+import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link IndexedLogWriteBatch}. */
-public class IndexedLogWriteBatchTest {
-    private IndexedRow row;
+/** Test for {@link CompactedLogWriteBatch}. */
+public class CompactedLogWriteBatchTest {
+    private CompactedRow row;
     private int estimatedSizeInBytes;
 
     @BeforeEach
     void setup() {
-        row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
-        estimatedSizeInBytes = IndexedLogRecord.sizeOf(row);
+        row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
+        estimatedSizeInBytes = CompactedLogRecord.sizeOf(row);
     }
 
     @Test
     void testTryAppendWithWriteLimit() throws Exception {
         int bucketId = 0;
         int writeLimit = 100;
-        IndexedLogWriteBatch logProducerBatch =
+        CompactedLogWriteBatch logProducerBatch =
                 createLogWriteBatch(
                         new TableBucket(DATA1_TABLE_ID, bucketId),
                         0L,
                         writeLimit,
                         MemorySegment.allocateHeapMemory(writeLimit));
 
-        for (int i = 0;
-                i
-                        < (writeLimit - 
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
-                                / estimatedSizeInBytes;
-                i++) {
+        int maxRecordsPerBatch =
+                (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
+                        / estimatedSizeInBytes;
+        for (int i = 0; i < maxRecordsPerBatch; i++) {
             boolean appendResult =
                     logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
             assertThat(appendResult).isTrue();
@@ -92,7 +88,7 @@ public class IndexedLogWriteBatchTest {
     @Test
     void testToBytes() throws Exception {
         int bucketId = 0;
-        IndexedLogWriteBatch logProducerBatch =
+        CompactedLogWriteBatch logProducerBatch =
                 createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
         boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
@@ -101,15 +97,14 @@ public class IndexedLogWriteBatchTest {
         BytesView bytesView = logProducerBatch.build();
         MemoryLogRecords logRecords = 
MemoryLogRecords.pointToBytesView(bytesView);
         Iterator<LogRecordBatch> iterator = logRecords.batches().iterator();
-        assertDefaultLogRecordBatchEquals(
-                iterator.next(), new TestingSchemaGetter(new 
SchemaInfo(DATA1_SCHEMA, (short) 1)));
+        assertDefaultLogRecordBatchEquals(iterator.next());
         assertThat(iterator.hasNext()).isFalse();
     }
 
     @Test
     void testCompleteTwice() throws Exception {
         int bucketId = 0;
-        IndexedLogWriteBatch logWriteBatch =
+        CompactedLogWriteBatch logWriteBatch =
                 createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
         boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
@@ -124,7 +119,7 @@ public class IndexedLogWriteBatchTest {
     @Test
     void testFailedTwice() throws Exception {
         int bucketId = 0;
-        IndexedLogWriteBatch logWriteBatch =
+        CompactedLogWriteBatch logWriteBatch =
                 createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
         boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
@@ -139,7 +134,7 @@ public class IndexedLogWriteBatchTest {
     @Test
     void testClose() throws Exception {
         int bucketId = 0;
-        IndexedLogWriteBatch logProducerBatch =
+        CompactedLogWriteBatch logProducerBatch =
                 createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
         boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
@@ -155,7 +150,7 @@ public class IndexedLogWriteBatchTest {
     void testBatchAborted() throws Exception {
         int bucketId = 0;
         int writeLimit = 10240;
-        IndexedLogWriteBatch logProducerBatch =
+        CompactedLogWriteBatch logProducerBatch =
                 createLogWriteBatch(
                         new TableBucket(DATA1_TABLE_ID, bucketId),
                         0L,
@@ -186,7 +181,7 @@ public class IndexedLogWriteBatchTest {
                         () -> logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback()))
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining(
-                        "Tried to append a record, but 
MemoryLogRecordsIndexedBuilder has already been aborted");
+                        "Tried to append a record, but 
MemoryLogRecordsCompactedBuilder has already been aborted");
 
         // try to build.
         assertThatThrownBy(logProducerBatch::build)
@@ -203,18 +198,19 @@ public class IndexedLogWriteBatchTest {
     }
 
     private WriteRecord createWriteRecord() {
-        return WriteRecord.forIndexedAppend(DATA1_TABLE_INFO, 
DATA1_PHYSICAL_TABLE_PATH, row, null);
+        return WriteRecord.forCompactedAppend(
+                DATA1_TABLE_INFO, DATA1_PHYSICAL_TABLE_PATH, row, null);
     }
 
-    private IndexedLogWriteBatch createLogWriteBatch(TableBucket tb, long 
baseLogOffset)
+    private CompactedLogWriteBatch createLogWriteBatch(TableBucket tb, long 
baseLogOffset)
             throws Exception {
         return createLogWriteBatch(
                 tb, baseLogOffset, Integer.MAX_VALUE, 
MemorySegment.allocateHeapMemory(1000));
     }
 
-    private IndexedLogWriteBatch createLogWriteBatch(
+    private CompactedLogWriteBatch createLogWriteBatch(
             TableBucket tb, long baseLogOffset, int writeLimit, MemorySegment 
memorySegment) {
-        return new IndexedLogWriteBatch(
+        return new CompactedLogWriteBatch(
                 tb.getBucket(),
                 DATA1_PHYSICAL_TABLE_PATH,
                 DATA1_TABLE_INFO.getSchemaId(),
@@ -223,14 +219,13 @@ public class IndexedLogWriteBatchTest {
                 System.currentTimeMillis());
     }
 
-    private void assertDefaultLogRecordBatchEquals(
-            LogRecordBatch recordBatch, TestingSchemaGetter schemaGetter) {
+    private void assertDefaultLogRecordBatchEquals(LogRecordBatch recordBatch) 
{
         assertThat(recordBatch.getRecordCount()).isEqualTo(1);
         assertThat(recordBatch.baseLogOffset()).isEqualTo(0L);
         assertThat(recordBatch.schemaId()).isEqualTo((short) 
DATA1_TABLE_INFO.getSchemaId());
         try (LogRecordReadContext readContext =
-                        LogRecordReadContext.createIndexedReadContext(
-                                DATA1_ROW_TYPE, 
DATA1_TABLE_INFO.getSchemaId(), schemaGetter);
+                        LogRecordReadContext.createCompactedRowReadContext(
+                                DATA1_ROW_TYPE, 
DATA1_TABLE_INFO.getSchemaId());
                 CloseableIterator<LogRecord> iterator = 
recordBatch.records(readContext)) {
             assertThat(iterator.hasNext()).isTrue();
             LogRecord record = iterator.next();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index db39493bd..aca1cde1b 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -74,11 +74,10 @@ public class IndexedLogWriteBatchTest {
                         writeLimit,
                         MemorySegment.allocateHeapMemory(writeLimit));
 
-        for (int i = 0;
-                i
-                        < (writeLimit - 
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
-                                / estimatedSizeInBytes;
-                i++) {
+        int maxRecordsPerBatch =
+                (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
+                        / estimatedSizeInBytes;
+        for (int i = 0; i < maxRecordsPerBatch; i++) {
             boolean appendResult =
                     logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
             assertThat(appendResult).isTrue();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index 82f6ee590..c62c7b029 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -207,6 +207,7 @@ class KvWriteBatchTest {
                 row,
                 key,
                 key,
+                WriteFormat.COMPACTED_KV,
                 null);
     }
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
index e23aa5cc3..13cda3325 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
@@ -30,8 +30,7 @@ import static 
org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
  * Default builder for {@link MemoryLogRecords} of log records in {@link 
LogFormat#COMPACTED}
  * format.
  */
-public class MemoryLogRecordsCompactedBuilder
-        extends AbstractRowMemoryLogRecordsBuilder<CompactedRow> {
+public class MemoryLogRecordsCompactedBuilder extends 
MemoryLogRecordsRowBuilder<CompactedRow> {
 
     private MemoryLogRecordsCompactedBuilder(
             long baseLogOffset,
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
index a713c34ee..34688ef55 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
@@ -29,7 +29,7 @@ import static 
org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
  * Default builder for {@link MemoryLogRecords} of log records in {@link
  * org.apache.fluss.metadata.LogFormat#INDEXED} format.
  */
-public class MemoryLogRecordsIndexedBuilder extends 
AbstractRowMemoryLogRecordsBuilder<IndexedRow> {
+public class MemoryLogRecordsIndexedBuilder extends 
MemoryLogRecordsRowBuilder<IndexedRow> {
 
     private MemoryLogRecordsIndexedBuilder(
             long baseLogOffset,
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java
similarity index 98%
rename from 
fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
rename to 
fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java
index 423b35a3b..127503aa0 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java
@@ -39,7 +39,7 @@ import static 
org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 
 /** Abstract base builder for row-based MemoryLogRecords builders sharing 
common logic. */
-abstract class AbstractRowMemoryLogRecordsBuilder<T> implements AutoCloseable {
+public abstract class MemoryLogRecordsRowBuilder<T> implements AutoCloseable {
     protected static final int BUILDER_DEFAULT_OFFSET = 0;
 
     protected final long baseLogOffset;
@@ -59,7 +59,7 @@ abstract class AbstractRowMemoryLogRecordsBuilder<T> 
implements AutoCloseable {
     private volatile boolean isClosed;
     private boolean aborted = false;
 
-    protected AbstractRowMemoryLogRecordsBuilder(
+    protected MemoryLogRecordsRowBuilder(
             long baseLogOffset,
             int schemaId,
             int writeLimit,

Reply via email to