This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a908690ed [common] Support COMPACTED format for log tables (#1605)
a908690ed is described below
commit a908690ed465d1a4349feebb9b92f80c61a41053
Author: Giannis Polyzos <[email protected]>
AuthorDate: Thu Dec 25 13:24:19 2025 +0200
[common] Support COMPACTED format for log tables (#1605)
Co-authored-by: Jark Wu <[email protected]>
---
.../client/table/writer/AppendWriterImpl.java | 25 +++
.../client/table/writer/UpsertWriterImpl.java | 11 +-
...iteBatch.java => AbstractRowLogWriteBatch.java} | 68 ++++---
.../fluss/client/write/CompactedLogWriteBatch.java | 64 +++++++
.../fluss/client/write/IndexedLogWriteBatch.java | 105 ++---------
.../fluss/client/write/RecordAccumulator.java | 109 +++++++-----
.../org/apache/fluss/client/write/WriteFormat.java | 47 ++++-
.../org/apache/fluss/client/write/WriteRecord.java | 39 ++++-
.../fluss/client/table/FlussTableITCase.java | 195 ++++++++++++++++++++-
...chTest.java => CompactedLogWriteBatchTest.java} | 61 +++----
.../client/write/IndexedLogWriteBatchTest.java | 9 +-
.../fluss/client/write/KvWriteBatchTest.java | 1 +
.../record/MemoryLogRecordsCompactedBuilder.java | 3 +-
.../record/MemoryLogRecordsIndexedBuilder.java | 2 +-
...uilder.java => MemoryLogRecordsRowBuilder.java} | 4 +-
15 files changed, 519 insertions(+), 224 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
index d702e9621..b3a3f8131 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
@@ -26,9 +26,12 @@ import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.InternalRow.FieldGetter;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.encode.CompactedRowEncoder;
import org.apache.fluss.row.encode.IndexedRowEncoder;
import org.apache.fluss.row.encode.KeyEncoder;
import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
@@ -44,6 +47,7 @@ class AppendWriterImpl extends AbstractTableWriter implements
AppendWriter {
private final LogFormat logFormat;
private final IndexedRowEncoder indexedRowEncoder;
+ private final CompactedRowEncoder compactedRowEncoder;
private final FieldGetter[] fieldGetters;
private final TableInfo tableInfo;
@@ -58,8 +62,12 @@ class AppendWriterImpl extends AbstractTableWriter
implements AppendWriter {
this.bucketKeyEncoder = KeyEncoder.of(rowType, bucketKeys,
lakeFormat);
}
+ DataType[] fieldDataTypes =
+ tableInfo.getSchema().getRowType().getChildren().toArray(new
DataType[0]);
+
this.logFormat = tableInfo.getTableConfig().getLogFormat();
this.indexedRowEncoder = new IndexedRowEncoder(tableInfo.getRowType());
+ this.compactedRowEncoder = new CompactedRowEncoder(fieldDataTypes);
this.fieldGetters =
InternalRow.createFieldGetters(tableInfo.getRowType());
this.tableInfo = tableInfo;
}
@@ -80,6 +88,11 @@ class AppendWriterImpl extends AbstractTableWriter
implements AppendWriter {
if (logFormat == LogFormat.INDEXED) {
IndexedRow indexedRow = encodeIndexedRow(row);
record = WriteRecord.forIndexedAppend(tableInfo, physicalPath,
indexedRow, bucketKey);
+ } else if (logFormat == LogFormat.COMPACTED) {
+ CompactedRow compactedRow = encodeCompactedRow(row);
+ record =
+ WriteRecord.forCompactedAppend(
+ tableInfo, physicalPath, compactedRow, bucketKey);
} else {
// ARROW format supports general internal row
record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row,
bucketKey);
@@ -87,6 +100,18 @@ class AppendWriterImpl extends AbstractTableWriter
implements AppendWriter {
return send(record).thenApply(ignored -> APPEND_SUCCESS);
}
+ private CompactedRow encodeCompactedRow(InternalRow row) {
+ if (row instanceof CompactedRow) {
+ return (CompactedRow) row;
+ }
+
+ compactedRowEncoder.startNewRow();
+ for (int i = 0; i < fieldCount; i++) {
+ compactedRowEncoder.encodeField(i,
fieldGetters[i].getFieldOrNull(row));
+ }
+ return compactedRowEncoder.finishRow();
+ }
+
private IndexedRow encodeIndexedRow(InternalRow row) {
if (row instanceof IndexedRow) {
return (IndexedRow) row;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index 39f65592c..20e5bcc5e 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -17,6 +17,7 @@
package org.apache.fluss.client.table.writer;
+import org.apache.fluss.client.write.WriteFormat;
import org.apache.fluss.client.write.WriteRecord;
import org.apache.fluss.client.write.WriterClient;
import org.apache.fluss.metadata.DataLakeFormat;
@@ -50,6 +51,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements
UpsertWriter {
private final KeyEncoder bucketKeyEncoder;
private final KvFormat kvFormat;
+ private final WriteFormat writeFormat;
private final RowEncoder rowEncoder;
private final FieldGetter[] fieldGetters;
private final TableInfo tableInfo;
@@ -78,6 +80,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements
UpsertWriter {
: KeyEncoder.of(rowType, tableInfo.getBucketKeys(),
lakeFormat);
this.kvFormat = tableInfo.getTableConfig().getKvFormat();
+ this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
this.fieldGetters = InternalRow.createFieldGetters(rowType);
this.tableInfo = tableInfo;
@@ -164,6 +167,7 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
encodeRow(row),
key,
bucketKey,
+ writeFormat,
targetColumns);
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
}
@@ -182,7 +186,12 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
bucketKeyEncoder == primaryKeyEncoder ? key :
bucketKeyEncoder.encodeKey(row);
WriteRecord record =
WriteRecord.forDelete(
- tableInfo, getPhysicalPath(row), key, bucketKey,
targetColumns);
+ tableInfo,
+ getPhysicalPath(row),
+ key,
+ bucketKey,
+ writeFormat,
+ targetColumns);
return send(record).thenApply(ignored -> DELETE_SUCCESS);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
similarity index 70%
copy from
fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
copy to
fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
index fb9655658..3c61cce82 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
@@ -17,18 +17,14 @@
package org.apache.fluss.client.write;
-import org.apache.fluss.annotation.Internal;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.memory.AbstractPagedOutputView;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
+import org.apache.fluss.record.MemoryLogRecordsRowBuilder;
import org.apache.fluss.record.bytesview.BytesView;
-import org.apache.fluss.row.indexed.IndexedRow;
-import org.apache.fluss.rpc.messages.ProduceLogRequest;
-
-import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.fluss.row.InternalRow;
import java.io.IOException;
import java.util.List;
@@ -37,55 +33,54 @@ import static
org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/**
- * A batch of log records managed in INDEXED format that is or will be sent to
server by {@link
- * ProduceLogRequest}.
- *
- * <p>This class is not thread safe and external synchronization must be used
when modifying it.
+ * Abstract base class to deduplicate logic for row-based log write batches
backed by in-memory
+ * builders. Concrete subclasses only need to provide a row-type
validator/caster and a
+ * RecordsBuilderAdapter implementation.
*/
-@NotThreadSafe
-@Internal
-public final class IndexedLogWriteBatch extends WriteBatch {
+abstract class AbstractRowLogWriteBatch<R> extends WriteBatch {
+
private final AbstractPagedOutputView outputView;
- private final MemoryLogRecordsIndexedBuilder recordsBuilder;
+ private final MemoryLogRecordsRowBuilder<R> recordsBuilder;
+ private final String buildErrorMessage;
- public IndexedLogWriteBatch(
+ protected AbstractRowLogWriteBatch(
int bucketId,
PhysicalTablePath physicalTablePath,
- int schemaId,
- int writeLimit,
+ long createdMs,
AbstractPagedOutputView outputView,
- long createdMs) {
+ MemoryLogRecordsRowBuilder<R> recordsBuilder,
+ String buildErrorMessage) {
super(bucketId, physicalTablePath, createdMs);
this.outputView = outputView;
- this.recordsBuilder =
- MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit,
outputView, true);
- }
-
- @Override
- public boolean isLogBatch() {
- return true;
+ this.recordsBuilder = recordsBuilder;
+ this.buildErrorMessage = buildErrorMessage;
}
@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception {
checkNotNull(callback, "write callback must be not null");
- checkNotNull(writeRecord.getRow(), "row must not be null for log
record");
+ InternalRow rowObj = writeRecord.getRow();
+ checkNotNull(rowObj, "row must not be null for log record");
checkArgument(writeRecord.getKey() == null, "key must be null for log
record");
checkArgument(
writeRecord.getTargetColumns() == null,
"target columns must be null for log record");
- checkArgument(
- writeRecord.getRow() instanceof IndexedRow,
- "row must not be IndexRow for indexed log table");
- IndexedRow row = (IndexedRow) writeRecord.getRow();
+
+ R row = requireAndCastRow(rowObj);
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
return false;
- } else {
- recordsBuilder.append(ChangeType.APPEND_ONLY, row);
- recordCount++;
- callbacks.add(callback);
- return true;
}
+ recordsBuilder.append(ChangeType.APPEND_ONLY, row);
+ recordCount++;
+ callbacks.add(callback);
+ return true;
+ }
+
+ protected abstract R requireAndCastRow(InternalRow row);
+
+ @Override
+ public boolean isLogBatch() {
+ return true;
}
@Override
@@ -93,7 +88,7 @@ public final class IndexedLogWriteBatch extends WriteBatch {
try {
return recordsBuilder.build();
} catch (IOException e) {
- throw new FlussRuntimeException("Failed to build indexed log
record batch.", e);
+ throw new FlussRuntimeException(buildErrorMessage, e);
}
}
@@ -133,6 +128,7 @@ public final class IndexedLogWriteBatch extends WriteBatch {
recordsBuilder.abort();
}
+ @Override
public void resetWriterState(long writerId, int batchSequence) {
super.resetWriterState(writerId, batchSequence);
recordsBuilder.resetWriterState(writerId, batchSequence);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
new file mode 100644
index 000000000..19366a4dc
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.write;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.memory.AbstractPagedOutputView;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.rpc.messages.ProduceLogRequest;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * A batch of log records managed in COMPACTED format that is or will be sent
to server by {@link
+ * ProduceLogRequest}.
+ *
+ * <p>This class is not thread safe and external synchronization must be used
when modifying it.
+ */
+@NotThreadSafe
+@Internal
+public final class CompactedLogWriteBatch extends
AbstractRowLogWriteBatch<CompactedRow> {
+
+ public CompactedLogWriteBatch(
+ int bucketId,
+ PhysicalTablePath physicalTablePath,
+ int schemaId,
+ int writeLimit,
+ AbstractPagedOutputView outputView,
+ long createdMs) {
+ super(
+ bucketId,
+ physicalTablePath,
+ createdMs,
+ outputView,
+ MemoryLogRecordsCompactedBuilder.builder(schemaId, writeLimit,
outputView, true),
+ "Failed to build compacted log record batch.");
+ }
+
+ @Override
+ protected CompactedRow requireAndCastRow(InternalRow row) {
+ checkArgument(
+ row instanceof CompactedRow, "row must be CompactedRow for
compacted log table");
+ return (CompactedRow) row;
+ }
+}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
index fb9655658..c70dd83a8 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
@@ -18,23 +18,15 @@
package org.apache.fluss.client.write;
import org.apache.fluss.annotation.Internal;
-import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.memory.AbstractPagedOutputView;
-import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
-import org.apache.fluss.record.bytesview.BytesView;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.rpc.messages.ProduceLogRequest;
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.IOException;
-import java.util.List;
-
import static org.apache.fluss.utils.Preconditions.checkArgument;
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
/**
* A batch of log records managed in INDEXED format that is or will be sent to
server by {@link
@@ -44,9 +36,7 @@ import static
org.apache.fluss.utils.Preconditions.checkNotNull;
*/
@NotThreadSafe
@Internal
-public final class IndexedLogWriteBatch extends WriteBatch {
- private final AbstractPagedOutputView outputView;
- private final MemoryLogRecordsIndexedBuilder recordsBuilder;
+public final class IndexedLogWriteBatch extends
AbstractRowLogWriteBatch<IndexedRow> {
public IndexedLogWriteBatch(
int bucketId,
@@ -55,91 +45,18 @@ public final class IndexedLogWriteBatch extends WriteBatch {
int writeLimit,
AbstractPagedOutputView outputView,
long createdMs) {
- super(bucketId, physicalTablePath, createdMs);
- this.outputView = outputView;
- this.recordsBuilder =
- MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit,
outputView, true);
- }
-
- @Override
- public boolean isLogBatch() {
- return true;
- }
-
- @Override
- public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception {
- checkNotNull(callback, "write callback must be not null");
- checkNotNull(writeRecord.getRow(), "row must not be null for log
record");
- checkArgument(writeRecord.getKey() == null, "key must be null for log
record");
- checkArgument(
- writeRecord.getTargetColumns() == null,
- "target columns must be null for log record");
- checkArgument(
- writeRecord.getRow() instanceof IndexedRow,
- "row must not be IndexRow for indexed log table");
- IndexedRow row = (IndexedRow) writeRecord.getRow();
- if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
- return false;
- } else {
- recordsBuilder.append(ChangeType.APPEND_ONLY, row);
- recordCount++;
- callbacks.add(callback);
- return true;
- }
- }
-
- @Override
- public BytesView build() {
- try {
- return recordsBuilder.build();
- } catch (IOException e) {
- throw new FlussRuntimeException("Failed to build indexed log
record batch.", e);
- }
- }
-
- @Override
- public boolean isClosed() {
- return recordsBuilder.isClosed();
- }
-
- @Override
- public void close() throws Exception {
- recordsBuilder.close();
- reopened = false;
- }
-
- @Override
- public List<MemorySegment> pooledMemorySegments() {
- return outputView.allocatedPooledSegments();
- }
-
- @Override
- public void setWriterState(long writerId, int batchSequence) {
- recordsBuilder.setWriterState(writerId, batchSequence);
- }
-
- @Override
- public long writerId() {
- return recordsBuilder.writerId();
- }
-
- @Override
- public int batchSequence() {
- return recordsBuilder.batchSequence();
- }
-
- @Override
- public void abortRecordAppends() {
- recordsBuilder.abort();
- }
-
- public void resetWriterState(long writerId, int batchSequence) {
- super.resetWriterState(writerId, batchSequence);
- recordsBuilder.resetWriterState(writerId, batchSequence);
+ super(
+ bucketId,
+ physicalTablePath,
+ createdMs,
+ outputView,
+ MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit,
outputView, true),
+ "Failed to build indexed log record batch.");
}
@Override
- public int estimatedSizeInBytes() {
- return recordsBuilder.getSizeInBytes();
+ protected IndexedRow requireAndCastRow(org.apache.fluss.row.InternalRow
row) {
+ checkArgument(row instanceof IndexedRow, "row must be IndexRow for
indexed log table");
+ return (IndexedRow) row;
}
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 3a282ba92..0c834a49f 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -581,45 +581,15 @@ public final class RecordAccumulator {
PreAllocatedPagedOutputView outputView = new
PreAllocatedPagedOutputView(segments);
int schemaId = tableInfo.getSchemaId();
WriteFormat writeFormat = writeRecord.getWriteFormat();
- // If the table is kv table we need to create a kv batch, otherwise we
create a log batch.
- final WriteBatch batch;
- if (writeFormat == WriteFormat.KV) {
- batch =
- new KvWriteBatch(
- bucketId,
- physicalTablePath,
- tableInfo.getSchemaId(),
- tableInfo.getTableConfig().getKvFormat(),
- outputView.getPreAllocatedSize(),
- outputView,
- writeRecord.getTargetColumns(),
- clock.milliseconds());
- } else if (writeFormat == WriteFormat.ARROW_LOG) {
- ArrowWriter arrowWriter =
- arrowWriterPool.getOrCreateWriter(
- tableInfo.getTableId(),
- schemaId,
- outputView.getPreAllocatedSize(),
- tableInfo.getRowType(),
-
tableInfo.getTableConfig().getArrowCompressionInfo());
- batch =
- new ArrowLogWriteBatch(
- bucketId,
- physicalTablePath,
- tableInfo.getSchemaId(),
- arrowWriter,
- outputView,
- clock.milliseconds());
- } else {
- batch =
- new IndexedLogWriteBatch(
- bucketId,
- physicalTablePath,
- tableInfo.getSchemaId(),
- outputView.getPreAllocatedSize(),
- outputView,
- clock.milliseconds());
- }
+ final WriteBatch batch =
+ createWriteBatch(
+ writeRecord,
+ bucketId,
+ tableInfo,
+ writeFormat,
+ physicalTablePath,
+ outputView,
+ schemaId);
batch.tryAppend(writeRecord, callback);
deque.addLast(batch);
@@ -627,6 +597,67 @@ public final class RecordAccumulator {
return new RecordAppendResult(deque.size() > 1 || batch.isClosed(),
true, false);
}
+ private WriteBatch createWriteBatch(
+ WriteRecord writeRecord,
+ int bucketId,
+ TableInfo tableInfo,
+ WriteFormat writeFormat,
+ PhysicalTablePath physicalTablePath,
+ PreAllocatedPagedOutputView outputView,
+ int schemaId) {
+ // If the table is kv table we need to create a kv batch, otherwise we
create a log batch.
+ switch (writeFormat) {
+ case COMPACTED_KV:
+ case INDEXED_KV:
+ return new KvWriteBatch(
+ bucketId,
+ physicalTablePath,
+ tableInfo.getSchemaId(),
+ writeFormat.toKvFormat(),
+ outputView.getPreAllocatedSize(),
+ outputView,
+ writeRecord.getTargetColumns(),
+ clock.milliseconds());
+
+ case ARROW_LOG:
+ ArrowWriter arrowWriter =
+ arrowWriterPool.getOrCreateWriter(
+ tableInfo.getTableId(),
+ schemaId,
+ outputView.getPreAllocatedSize(),
+ tableInfo.getRowType(),
+
tableInfo.getTableConfig().getArrowCompressionInfo());
+ return new ArrowLogWriteBatch(
+ bucketId,
+ physicalTablePath,
+ tableInfo.getSchemaId(),
+ arrowWriter,
+ outputView,
+ clock.milliseconds());
+
+ case COMPACTED_LOG:
+ return new CompactedLogWriteBatch(
+ bucketId,
+ physicalTablePath,
+ schemaId,
+ outputView.getPreAllocatedSize(),
+ outputView,
+ clock.milliseconds());
+
+ case INDEXED_LOG:
+ return new IndexedLogWriteBatch(
+ bucketId,
+ physicalTablePath,
+ tableInfo.getSchemaId(),
+ outputView.getPreAllocatedSize(),
+ outputView,
+ clock.milliseconds());
+
+ default:
+ throw new UnsupportedOperationException("Unsupported write
format: " + writeFormat);
+ }
+ }
+
private RecordAppendResult tryAppend(
WriteRecord writeRecord, WriteCallback callback, Deque<WriteBatch>
deque)
throws Exception {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java
index 669c6807e..24f07c9ce 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteFormat.java
@@ -18,11 +18,52 @@
package org.apache.fluss.client.write;
import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.metadata.KvFormat;
/** The format of the write record. */
@Internal
public enum WriteFormat {
- ARROW_LOG,
- INDEXED_LOG,
- KV
+ ARROW_LOG(true),
+ INDEXED_LOG(true),
+ COMPACTED_LOG(true),
+ INDEXED_KV(false),
+ COMPACTED_KV(false);
+
+ private final boolean isLog;
+
+ WriteFormat(boolean isLog) {
+ this.isLog = isLog;
+ }
+
+ public boolean isLog() {
+ return isLog;
+ }
+
+ public boolean isKv() {
+ return !isLog;
+ }
+
+ /** Converts this {@link WriteFormat} to a {@link KvFormat}. */
+ public KvFormat toKvFormat() {
+ switch (this) {
+ case INDEXED_KV:
+ return KvFormat.INDEXED;
+ case COMPACTED_KV:
+ return KvFormat.COMPACTED;
+ default:
+ throw new IllegalArgumentException("WriteFormat " + this + "
is not a KvFormat");
+ }
+ }
+
+ /** Converts a {@link KvFormat} to a {@link WriteFormat}. */
+ public static WriteFormat fromKvFormat(KvFormat kvFormat) {
+ switch (kvFormat) {
+ case INDEXED:
+ return INDEXED_KV;
+ case COMPACTED:
+ return COMPACTED_KV;
+ default:
+ throw new IllegalArgumentException("Unknown KvFormat: " +
kvFormat);
+ }
+ }
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
index 68a3fe8a2..9265c5f77 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
@@ -20,17 +20,20 @@ package org.apache.fluss.client.write;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.record.CompactedLogRecord;
import org.apache.fluss.record.DefaultKvRecord;
-import org.apache.fluss.record.DefaultKvRecordBatch;
import org.apache.fluss.record.IndexedLogRecord;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.row.indexed.IndexedRow;
import javax.annotation.Nullable;
+import static
org.apache.fluss.record.DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/**
@@ -47,19 +50,20 @@ public final class WriteRecord {
BinaryRow row,
byte[] key,
byte[] bucketKey,
+ WriteFormat writeFormat,
@Nullable int[] targetColumns) {
checkNotNull(row, "row must not be null");
checkNotNull(key, "key must not be null");
checkNotNull(bucketKey, "bucketKey must not be null");
- int estimatedSizeInBytes =
- DefaultKvRecord.sizeOf(key, row) +
DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
+ checkArgument(writeFormat.isKv(), "writeFormat must be a KV format");
+ int estimatedSizeInBytes = DefaultKvRecord.sizeOf(key, row) +
RECORD_BATCH_HEADER_SIZE;
return new WriteRecord(
tableInfo,
tablePath,
key,
bucketKey,
row,
- WriteFormat.KV,
+ writeFormat,
targetColumns,
estimatedSizeInBytes);
}
@@ -70,18 +74,19 @@ public final class WriteRecord {
PhysicalTablePath tablePath,
byte[] key,
byte[] bucketKey,
+ WriteFormat writeFormat,
@Nullable int[] targetColumns) {
checkNotNull(key, "key must not be null");
checkNotNull(bucketKey, "key must not be null");
- int estimatedSizeInBytes =
- DefaultKvRecord.sizeOf(key, null) +
DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
+ checkArgument(writeFormat.isKv(), "writeFormat must be a KV format");
+ int estimatedSizeInBytes = DefaultKvRecord.sizeOf(key, null) +
RECORD_BATCH_HEADER_SIZE;
return new WriteRecord(
tableInfo,
tablePath,
key,
bucketKey,
null,
- WriteFormat.KV,
+ writeFormat,
targetColumns,
estimatedSizeInBytes);
}
@@ -127,6 +132,26 @@ public final class WriteRecord {
estimatedSizeInBytes);
}
+ /** Creates a write record for append operation for Compacted format. */
+ public static WriteRecord forCompactedAppend(
+ TableInfo tableInfo,
+ PhysicalTablePath tablePath,
+ CompactedRow row,
+ @Nullable byte[] bucketKey) {
+ checkNotNull(row);
+ int estimatedSizeInBytes =
+ CompactedLogRecord.sizeOf(row) +
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
+ return new WriteRecord(
+ tableInfo,
+ tablePath,
+ null,
+ bucketKey,
+ row,
+ WriteFormat.COMPACTED_LOG,
+ null,
+ estimatedSizeInBytes);
+ }
+
//
------------------------------------------------------------------------------------------
private final PhysicalTablePath physicalTablePath;
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index 0cc2f7f10..27b6690a9 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -742,7 +742,7 @@ class FlussTableITCase extends ClientToServerITCaseBase {
}
@ParameterizedTest
- @ValueSource(strings = {"INDEXED", "ARROW"})
+ @ValueSource(strings = {"INDEXED", "ARROW", "COMPACTED"})
void testAppendAndPoll(String format) throws Exception {
verifyAppendOrPut(true, format, null);
}
@@ -1417,4 +1417,197 @@ class FlussTableITCase extends ClientToServerITCaseBase
{
Collections.singletonMap("client.fs.test.key",
"fs_test_value"));
}
}
+
+ // ---------------------- PK with COMPACTED log tests
----------------------
+ @Test
+ void testPkUpsertAndPollWithCompactedLog() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.STRING())
+ .column("d", DataTypes.BIGINT())
+ .primaryKey("a")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(schema)
+ .kvFormat(KvFormat.COMPACTED)
+ .logFormat(LogFormat.COMPACTED)
+ .build();
+ TablePath tablePath = TablePath.of("test_db_1",
"test_pk_compacted_upsert_poll");
+ createTable(tablePath, tableDescriptor, false);
+
+ int expectedSize = 30;
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+ for (int i = 0; i < expectedSize; i++) {
+ String value = i % 2 == 0 ? "hello, friend" + i : null;
+ GenericRow r = row(i, 100, value, i * 10L);
+ upsertWriter.upsert(r);
+ if (i % 10 == 0) {
+ upsertWriter.flush();
+ }
+ }
+ upsertWriter.flush();
+
+ // normal scan
+ try (LogScanner logScanner = createLogScanner(table)) {
+ subscribeFromBeginning(logScanner, table);
+ int count = 0;
+ while (count < expectedSize) {
+ ScanRecords scanRecords =
logScanner.poll(Duration.ofSeconds(1));
+ for (ScanRecord scanRecord : scanRecords) {
+
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT);
+ InternalRow rr = scanRecord.getRow();
+ assertThat(rr.getFieldCount()).isEqualTo(4);
+ assertThat(rr.getInt(0)).isEqualTo(count);
+ assertThat(rr.getInt(1)).isEqualTo(100);
+ if (count % 2 == 0) {
+ assertThat(rr.getString(2).toString())
+ .isEqualTo("hello, friend" + count);
+ } else {
+ assertThat(rr.isNullAt(2)).isTrue();
+ }
+ assertThat(rr.getLong(3)).isEqualTo(count * 10L);
+ count++;
+ }
+ }
+ assertThat(count).isEqualTo(expectedSize);
+ }
+
+ // Creating a projected log scanner for COMPACTED should work
+ try (LogScanner scanner = createLogScanner(table, new int[] {0,
2})) {
+ subscribeFromBeginning(scanner, table);
+ int count = 0;
+ while (count < expectedSize) {
+ ScanRecords records = scanner.poll(Duration.ofSeconds(1));
+ for (ScanRecord record : records) {
+ InternalRow row = record.getRow();
+ assertThat(row.getFieldCount()).isEqualTo(2);
+ assertThat(row.getInt(0)).isEqualTo(count);
+ if (count % 2 == 0) {
+ assertThat(row.getString(1).toString())
+ .isEqualTo("hello, friend" + count);
+ } else {
+ assertThat(row.isNullAt(1)).isTrue();
+ }
+ count++;
+ }
+ }
+ assertThat(count).isEqualTo(expectedSize);
+ }
+ }
+ }
+
+ @Test
+ void testPkUpdateAndDeleteWithCompactedLog() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .primaryKey("a")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(schema)
+ .kvFormat(KvFormat.COMPACTED)
+ .logFormat(LogFormat.COMPACTED)
+ .build();
+ TablePath tablePath = TablePath.of("test_db_1",
"test_pk_compacted_update_delete");
+ createTable(tablePath, tableDescriptor, false);
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+ // initial insert
+ upsertWriter.upsert(row(1, 10));
+ upsertWriter.flush();
+ // update same key
+ upsertWriter.upsert(row(1, 20));
+ upsertWriter.flush();
+ // delete the key
+ upsertWriter.delete(row(1, 20));
+ upsertWriter.flush();
+
+ LogScanner scanner = createLogScanner(table);
+ subscribeFromBeginning(scanner, table);
+ // Expect: +I(1,10), -U(1,10), +U(1,20), -D(1,20)
+ ChangeType[] expected = {
+ ChangeType.INSERT,
+ ChangeType.UPDATE_BEFORE,
+ ChangeType.UPDATE_AFTER,
+ ChangeType.DELETE
+ };
+ int seen = 0;
+ while (seen < expected.length) {
+ ScanRecords recs = scanner.poll(Duration.ofSeconds(1));
+ for (ScanRecord r : recs) {
+ assertThat(r.getChangeType()).isEqualTo(expected[seen]);
+ InternalRow row = r.getRow();
+ assertThat(row.getInt(0)).isEqualTo(1);
+ // value field present
+ if (expected[seen] == ChangeType.UPDATE_AFTER
+ || expected[seen] == ChangeType.DELETE) {
+ assertThat(row.getInt(1)).isEqualTo(20);
+ } else {
+ assertThat(row.getInt(1)).isEqualTo(10);
+ }
+ seen++;
+ }
+ }
+ assertThat(seen).isEqualTo(expected.length);
+ scanner.close();
+ }
+ }
+
+ @Test
+ void testPkCompactedPollFromLatestNoRecords() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .primaryKey("a")
+ .build();
+ TableDescriptor td =
+ TableDescriptor.builder()
+ .schema(schema)
+ .kvFormat(KvFormat.COMPACTED)
+ .logFormat(LogFormat.COMPACTED)
+ .build();
+ TablePath path = TablePath.of("test_db_1", "test_pk_compacted_latest");
+ createTable(path, td, false);
+
+ try (Table table = conn.getTable(path)) {
+ LogScanner scanner = createLogScanner(table);
+ subscribeFromLatestOffset(path, null, null, table, scanner, admin);
+ // Now write a few rows and ensure only these are seen
+ UpsertWriter upsert = table.newUpsert().createWriter();
+ for (int i = 0; i < 5; i++) {
+ upsert.upsert(row(i, i));
+ }
+ upsert.flush();
+
+ int seen = 0;
+ while (seen < 5) {
+ ScanRecords recs = scanner.poll(Duration.ofSeconds(1));
+ for (ScanRecord r : recs) {
+ assertThat(r.getChangeType()).isEqualTo(ChangeType.INSERT);
+ assertThat(r.getRow().getInt(0)).isBetween(0, 4);
+ seen++;
+ }
+ }
+
+ // delete non-existent key
+ upsert.delete(row(42, 0));
+ upsert.flush();
+ // poll a few times to ensure no accidental records
+ int total = 0;
+ for (int i = 0; i < 3; i++) {
+ total += scanner.poll(Duration.ofSeconds(1)).count();
+ }
+ assertThat(total).isEqualTo(0);
+
+ scanner.close();
+ }
+ }
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
similarity index 83%
copy from
fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
copy to
fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
index db39493bd..cbd6680a7 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
@@ -19,17 +19,15 @@ package org.apache.fluss.client.write;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.PreAllocatedPagedOutputView;
-import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.ChangeType;
-import org.apache.fluss.record.IndexedLogRecord;
+import org.apache.fluss.record.CompactedLogRecord;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.record.MemoryLogRecords;
-import org.apache.fluss.record.TestingSchemaGetter;
import org.apache.fluss.record.bytesview.BytesView;
-import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.utils.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
@@ -45,40 +43,38 @@ import static
org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
-import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
-import static org.apache.fluss.testutils.DataTestUtils.indexedRow;
+import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Test for {@link IndexedLogWriteBatch}. */
-public class IndexedLogWriteBatchTest {
- private IndexedRow row;
+/** Test for {@link CompactedLogWriteBatch}. */
+public class CompactedLogWriteBatchTest {
+ private CompactedRow row;
private int estimatedSizeInBytes;
@BeforeEach
void setup() {
- row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
- estimatedSizeInBytes = IndexedLogRecord.sizeOf(row);
+ row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
+ estimatedSizeInBytes = CompactedLogRecord.sizeOf(row);
}
@Test
void testTryAppendWithWriteLimit() throws Exception {
int bucketId = 0;
int writeLimit = 100;
- IndexedLogWriteBatch logProducerBatch =
+ CompactedLogWriteBatch logProducerBatch =
createLogWriteBatch(
new TableBucket(DATA1_TABLE_ID, bucketId),
0L,
writeLimit,
MemorySegment.allocateHeapMemory(writeLimit));
- for (int i = 0;
- i
- < (writeLimit -
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
- / estimatedSizeInBytes;
- i++) {
+ int maxRecordsPerBatch =
+ (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
+ / estimatedSizeInBytes;
+ for (int i = 0; i < maxRecordsPerBatch; i++) {
boolean appendResult =
logProducerBatch.tryAppend(createWriteRecord(),
newWriteCallback());
assertThat(appendResult).isTrue();
@@ -92,7 +88,7 @@ public class IndexedLogWriteBatchTest {
@Test
void testToBytes() throws Exception {
int bucketId = 0;
- IndexedLogWriteBatch logProducerBatch =
+ CompactedLogWriteBatch logProducerBatch =
createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId),
0L);
boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(),
newWriteCallback());
assertThat(appendResult).isTrue();
@@ -101,15 +97,14 @@ public class IndexedLogWriteBatchTest {
BytesView bytesView = logProducerBatch.build();
MemoryLogRecords logRecords =
MemoryLogRecords.pointToBytesView(bytesView);
Iterator<LogRecordBatch> iterator = logRecords.batches().iterator();
- assertDefaultLogRecordBatchEquals(
- iterator.next(), new TestingSchemaGetter(new
SchemaInfo(DATA1_SCHEMA, (short) 1)));
+ assertDefaultLogRecordBatchEquals(iterator.next());
assertThat(iterator.hasNext()).isFalse();
}
@Test
void testCompleteTwice() throws Exception {
int bucketId = 0;
- IndexedLogWriteBatch logWriteBatch =
+ CompactedLogWriteBatch logWriteBatch =
createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId),
0L);
boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(),
newWriteCallback());
assertThat(appendResult).isTrue();
@@ -124,7 +119,7 @@ public class IndexedLogWriteBatchTest {
@Test
void testFailedTwice() throws Exception {
int bucketId = 0;
- IndexedLogWriteBatch logWriteBatch =
+ CompactedLogWriteBatch logWriteBatch =
createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId),
0L);
boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(),
newWriteCallback());
assertThat(appendResult).isTrue();
@@ -139,7 +134,7 @@ public class IndexedLogWriteBatchTest {
@Test
void testClose() throws Exception {
int bucketId = 0;
- IndexedLogWriteBatch logProducerBatch =
+ CompactedLogWriteBatch logProducerBatch =
createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId),
0L);
boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(),
newWriteCallback());
assertThat(appendResult).isTrue();
@@ -155,7 +150,7 @@ public class IndexedLogWriteBatchTest {
void testBatchAborted() throws Exception {
int bucketId = 0;
int writeLimit = 10240;
- IndexedLogWriteBatch logProducerBatch =
+ CompactedLogWriteBatch logProducerBatch =
createLogWriteBatch(
new TableBucket(DATA1_TABLE_ID, bucketId),
0L,
@@ -186,7 +181,7 @@ public class IndexedLogWriteBatchTest {
() -> logProducerBatch.tryAppend(createWriteRecord(),
newWriteCallback()))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
- "Tried to append a record, but
MemoryLogRecordsIndexedBuilder has already been aborted");
+ "Tried to append a record, but
MemoryLogRecordsCompactedBuilder has already been aborted");
// try to build.
assertThatThrownBy(logProducerBatch::build)
@@ -203,18 +198,19 @@ public class IndexedLogWriteBatchTest {
}
private WriteRecord createWriteRecord() {
- return WriteRecord.forIndexedAppend(DATA1_TABLE_INFO,
DATA1_PHYSICAL_TABLE_PATH, row, null);
+ return WriteRecord.forCompactedAppend(
+ DATA1_TABLE_INFO, DATA1_PHYSICAL_TABLE_PATH, row, null);
}
- private IndexedLogWriteBatch createLogWriteBatch(TableBucket tb, long
baseLogOffset)
+ private CompactedLogWriteBatch createLogWriteBatch(TableBucket tb, long
baseLogOffset)
throws Exception {
return createLogWriteBatch(
tb, baseLogOffset, Integer.MAX_VALUE,
MemorySegment.allocateHeapMemory(1000));
}
- private IndexedLogWriteBatch createLogWriteBatch(
+ private CompactedLogWriteBatch createLogWriteBatch(
TableBucket tb, long baseLogOffset, int writeLimit, MemorySegment
memorySegment) {
- return new IndexedLogWriteBatch(
+ return new CompactedLogWriteBatch(
tb.getBucket(),
DATA1_PHYSICAL_TABLE_PATH,
DATA1_TABLE_INFO.getSchemaId(),
@@ -223,14 +219,13 @@ public class IndexedLogWriteBatchTest {
System.currentTimeMillis());
}
- private void assertDefaultLogRecordBatchEquals(
- LogRecordBatch recordBatch, TestingSchemaGetter schemaGetter) {
+ private void assertDefaultLogRecordBatchEquals(LogRecordBatch recordBatch)
{
assertThat(recordBatch.getRecordCount()).isEqualTo(1);
assertThat(recordBatch.baseLogOffset()).isEqualTo(0L);
assertThat(recordBatch.schemaId()).isEqualTo((short)
DATA1_TABLE_INFO.getSchemaId());
try (LogRecordReadContext readContext =
- LogRecordReadContext.createIndexedReadContext(
- DATA1_ROW_TYPE,
DATA1_TABLE_INFO.getSchemaId(), schemaGetter);
+ LogRecordReadContext.createCompactedRowReadContext(
+ DATA1_ROW_TYPE,
DATA1_TABLE_INFO.getSchemaId());
CloseableIterator<LogRecord> iterator =
recordBatch.records(readContext)) {
assertThat(iterator.hasNext()).isTrue();
LogRecord record = iterator.next();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index db39493bd..aca1cde1b 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -74,11 +74,10 @@ public class IndexedLogWriteBatchTest {
writeLimit,
MemorySegment.allocateHeapMemory(writeLimit));
- for (int i = 0;
- i
- < (writeLimit -
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
- / estimatedSizeInBytes;
- i++) {
+ int maxRecordsPerBatch =
+ (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
+ / estimatedSizeInBytes;
+ for (int i = 0; i < maxRecordsPerBatch; i++) {
boolean appendResult =
logProducerBatch.tryAppend(createWriteRecord(),
newWriteCallback());
assertThat(appendResult).isTrue();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index 82f6ee590..c62c7b029 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -207,6 +207,7 @@ class KvWriteBatchTest {
row,
key,
key,
+ WriteFormat.COMPACTED_KV,
null);
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
index e23aa5cc3..13cda3325 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
@@ -30,8 +30,7 @@ import static
org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
* Default builder for {@link MemoryLogRecords} of log records in {@link
LogFormat#COMPACTED}
* format.
*/
-public class MemoryLogRecordsCompactedBuilder
- extends AbstractRowMemoryLogRecordsBuilder<CompactedRow> {
+public class MemoryLogRecordsCompactedBuilder extends
MemoryLogRecordsRowBuilder<CompactedRow> {
private MemoryLogRecordsCompactedBuilder(
long baseLogOffset,
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
index a713c34ee..34688ef55 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
@@ -29,7 +29,7 @@ import static
org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
* Default builder for {@link MemoryLogRecords} of log records in {@link
* org.apache.fluss.metadata.LogFormat#INDEXED} format.
*/
-public class MemoryLogRecordsIndexedBuilder extends
AbstractRowMemoryLogRecordsBuilder<IndexedRow> {
+public class MemoryLogRecordsIndexedBuilder extends
MemoryLogRecordsRowBuilder<IndexedRow> {
private MemoryLogRecordsIndexedBuilder(
long baseLogOffset,
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java
similarity index 98%
rename from
fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
rename to
fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java
index 423b35a3b..127503aa0 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsRowBuilder.java
@@ -39,7 +39,7 @@ import static
org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
import static org.apache.fluss.utils.Preconditions.checkArgument;
/** Abstract base builder for row-based MemoryLogRecords builders sharing
common logic. */
-abstract class AbstractRowMemoryLogRecordsBuilder<T> implements AutoCloseable {
+public abstract class MemoryLogRecordsRowBuilder<T> implements AutoCloseable {
protected static final int BUILDER_DEFAULT_OFFSET = 0;
protected final long baseLogOffset;
@@ -59,7 +59,7 @@ abstract class AbstractRowMemoryLogRecordsBuilder<T>
implements AutoCloseable {
private volatile boolean isClosed;
private boolean aborted = false;
- protected AbstractRowMemoryLogRecordsBuilder(
+ protected MemoryLogRecordsRowBuilder(
long baseLogOffset,
int schemaId,
int writeLimit,