This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 03c860243 [kv] Supports compacted row as change log (#2108)
03c860243 is described below
commit 03c8602433a9872ab0e7d20624d021ac0c6e33cc
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Dec 16 11:17:56 2025 +0800
[kv] Supports compacted row as change log (#2108)
---------
Co-authored-by: ipolyzos <[email protected]>
---
.../fluss/client/admin/FlussAdminITCase.java | 2 +-
.../fluss/client/table/FlussTableITCase.java | 16 +-
.../org/apache/fluss/config/ConfigOptions.java | 2 +-
.../java/org/apache/fluss/metadata/LogFormat.java | 24 ++-
...ava => AbstractRowMemoryLogRecordsBuilder.java} | 91 ++++------
...dexedLogRecord.java => CompactedLogRecord.java} | 94 ++++------
.../apache/fluss/record/DefaultLogRecordBatch.java | 29 +++
.../org/apache/fluss/record/IndexedLogRecord.java | 19 +-
.../java/org/apache/fluss/record/LogRecord.java | 28 +++
.../apache/fluss/record/LogRecordReadContext.java | 25 +++
.../fluss/record/MemoryLogRecordsArrowBuilder.java | 2 +-
.../record/MemoryLogRecordsCompactedBuilder.java | 78 ++++++++
.../record/MemoryLogRecordsIndexedBuilder.java | 196 +--------------------
.../fluss/record/CompactedLogRecordTest.java | 89 ++++++++++
.../MemoryLogRecordsCompactedBuilderTest.java | 189 ++++++++++++++++++++
.../apache/fluss/server/kv/KvRecoverHelper.java | 20 ++-
.../java/org/apache/fluss/server/kv/KvTablet.java | 3 +
.../fluss/server/kv/wal/CompactedWalBuilder.java | 92 ++++++++++
.../org/apache/fluss/server/replica/Replica.java | 1 +
.../server/utils/TableDescriptorValidation.java | 8 +-
.../server/replica/KvReplicaRestoreITCase.java | 13 +-
21 files changed, 678 insertions(+), 343 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index bd7954284..62026ca2f 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -659,7 +659,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining(
- "Currently, Primary Key Table only supports ARROW log
format if kv format is COMPACTED.");
+ "Currently, Primary Key Table supports ARROW or
COMPACTED log format when kv format is COMPACTED.");
}
@Test
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index bfd9926df..0cc2f7f10 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -753,6 +753,11 @@ class FlussTableITCase extends ClientToServerITCaseBase {
verifyAppendOrPut(false, "ARROW", kvFormat);
}
+ @Test
+ void testPutAndPollCompacted() throws Exception {
+ verifyAppendOrPut(false, "COMPACTED", "COMPACTED");
+ }
+
void verifyAppendOrPut(boolean append, String logFormat, @Nullable String
kvFormat)
throws Exception {
Schema schema =
@@ -911,8 +916,9 @@ class FlussTableITCase extends ClientToServerITCaseBase {
}
}
- @Test
- void testPutAndProject() throws Exception {
+ @ParameterizedTest
+ @ValueSource(strings = {"ARROW", "COMPACTED"})
+ void testPutAndProject(String changelogFormat) throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
@@ -921,7 +927,11 @@ class FlussTableITCase extends ClientToServerITCaseBase {
.column("d", DataTypes.BIGINT())
.primaryKey("a")
.build();
- TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(schema)
+ .property(ConfigOptions.TABLE_LOG_FORMAT.key(),
changelogFormat)
+ .build();
TablePath tablePath = TablePath.of("test_db_1", "test_pk_table_1");
createTable(tablePath, tableDescriptor, false);
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 38c36c4c4..d571db270 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1230,7 +1230,7 @@ public class ConfigOptions {
.defaultValue(LogFormat.ARROW)
.withDescription(
"The format of the log records in log store. The
default value is `arrow`. "
- + "The supported formats are `arrow` and
`indexed`.");
+ + "The supported formats are `arrow`,
`indexed` and `compacted`.");
public static final ConfigOption<ArrowCompressionType>
TABLE_LOG_ARROW_COMPRESSION_TYPE =
key("table.log.arrow.compression.type")
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java
index d1b6de4ba..b5f460162 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/LogFormat.java
@@ -18,10 +18,15 @@
package org.apache.fluss.metadata;
import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
+import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
+import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.row.indexed.IndexedRow;
-/** The format of the log records in log store. The supported formats are
'arrow' and 'indexed'. */
+/**
+ * The format of the log records in log store. The supported formats are
'arrow', 'indexed' and
+ * 'compacted'.
+ */
public enum LogFormat {
/**
@@ -41,11 +46,20 @@ public enum LogFormat {
*
* @see MemoryLogRecordsIndexedBuilder
*/
- INDEXED;
+ INDEXED,
+
+ /**
+ * The log record batches are stored in {@link CompactedRow} format which
is a compact
+ * row-oriented format optimized for primary key tables to reduce storage
while trading CPU for
+ * reads.
+ *
+ * @see MemoryLogRecordsCompactedBuilder
+ */
+ COMPACTED;
/**
- * Creates a {@link LogFormat} from the given string. The string must be
either 'arrow' or
- * 'indexed'.
+ * Creates a {@link LogFormat} from the given string. The string must be
either 'arrow',
+ * 'indexed' or 'compacted'.
*/
public static LogFormat fromString(String format) {
switch (format.toUpperCase()) {
@@ -53,6 +67,8 @@ public enum LogFormat {
return ARROW;
case "INDEXED":
return INDEXED;
+ case "COMPACTED":
+ return COMPACTED;
default:
throw new IllegalArgumentException("Unsupported log format: "
+ format);
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
similarity index 72%
copy from
fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
copy to
fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
index 8488c3351..423b35a3b 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/AbstractRowMemoryLogRecordsBuilder.java
@@ -17,19 +17,15 @@
package org.apache.fluss.record;
-import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.memory.AbstractPagedOutputView;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.MemorySegmentOutputView;
-import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.record.bytesview.BytesView;
import org.apache.fluss.record.bytesview.MultiBytesView;
-import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.utils.crc.Crc32C;
import java.io.IOException;
-import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
@@ -42,20 +38,18 @@ import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize
import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
import static org.apache.fluss.utils.Preconditions.checkArgument;
-/**
- * Default builder for {@link MemoryLogRecords} of log records in {@link
LogFormat#INDEXED} format.
- */
-public class MemoryLogRecordsIndexedBuilder implements AutoCloseable {
- private static final int BUILDER_DEFAULT_OFFSET = 0;
+/** Abstract base builder for row-based MemoryLogRecords builders sharing
common logic. */
+abstract class AbstractRowMemoryLogRecordsBuilder<T> implements AutoCloseable {
+ protected static final int BUILDER_DEFAULT_OFFSET = 0;
- private final long baseLogOffset;
- private final int schemaId;
+ protected final long baseLogOffset;
+ protected final int schemaId;
// The max bytes can be appended.
- private final int writeLimit;
- private final byte magic;
- private final AbstractPagedOutputView pagedOutputView;
- private final MemorySegment firstSegment;
- private final boolean appendOnly;
+ protected final int writeLimit;
+ protected final byte magic;
+ protected final AbstractPagedOutputView pagedOutputView;
+ protected final MemorySegment firstSegment;
+ protected final boolean appendOnly;
private BytesView builtBuffer = null;
private long writerId;
@@ -65,7 +59,7 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
private volatile boolean isClosed;
private boolean aborted = false;
- private MemoryLogRecordsIndexedBuilder(
+ protected AbstractRowMemoryLogRecordsBuilder(
long baseLogOffset,
int schemaId,
int writeLimit,
@@ -87,64 +81,44 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
this.currentRecordNumber = 0;
this.isClosed = false;
- // We don't need to write header information while the builder
creating,
- // we'll skip it first.
- this.pagedOutputView.setPosition(recordBatchHeaderSize(magic));
- this.sizeInBytes = recordBatchHeaderSize(magic);
+ // Skip header initially; will be written in build()
+ int headerSize = recordBatchHeaderSize(magic);
+ this.pagedOutputView.setPosition(headerSize);
+ this.sizeInBytes = headerSize;
}
- public static MemoryLogRecordsIndexedBuilder builder(
- int schemaId, int writeLimit, AbstractPagedOutputView outputView,
boolean appendOnly) {
- return new MemoryLogRecordsIndexedBuilder(
- BUILDER_DEFAULT_OFFSET,
- schemaId,
- writeLimit,
- CURRENT_LOG_MAGIC_VALUE,
- outputView,
- appendOnly);
- }
+ /** Implement to return size of the record (including length field). */
+ protected abstract int sizeOf(T row);
- @VisibleForTesting
- public static MemoryLogRecordsIndexedBuilder builder(
- long baseLogOffset,
- int schemaId,
- int writeLimit,
- byte magic,
- AbstractPagedOutputView outputView)
- throws IOException {
- return new MemoryLogRecordsIndexedBuilder(
- baseLogOffset, schemaId, writeLimit, magic, outputView, false);
- }
+ /** Implement to write the record and return total written bytes including
length field. */
+ protected abstract int writeRecord(ChangeType changeType, T row) throws
IOException;
- /**
- * Check if we have room for a new record containing the given row. If no
records have been
- * appended, then this returns true.
- */
- public boolean hasRoomFor(IndexedRow row) {
- return sizeInBytes + IndexedLogRecord.sizeOf(row) <= writeLimit;
+ public boolean hasRoomFor(T row) {
+ return sizeInBytes + sizeOf(row) <= writeLimit;
}
- public void append(ChangeType changeType, IndexedRow row) throws Exception
{
+ public void append(ChangeType changeType, T row) throws Exception {
appendRecord(changeType, row);
}
- private void appendRecord(ChangeType changeType, IndexedRow row) throws
IOException {
+ private void appendRecord(ChangeType changeType, T row) throws IOException
{
if (aborted) {
throw new IllegalStateException(
- "Tried to append a record, but
MemoryLogRecordsIndexedBuilder has already been aborted");
+ "Tried to append a record, but "
+ + getClass().getSimpleName()
+ + " has already been aborted");
}
-
if (isClosed) {
throw new IllegalStateException(
"Tried to append a record, but MemoryLogRecordsBuilder is
closed for record appends");
}
if (appendOnly && changeType != ChangeType.APPEND_ONLY) {
throw new IllegalArgumentException(
- "Only append-only change type is allowed for append-only
arrow log builder, but got "
+ "Only append-only change type is allowed for append-only
row log builder, but got "
+ changeType);
}
- int recordByteSizes = IndexedLogRecord.writeTo(pagedOutputView,
changeType, row);
+ int recordByteSizes = writeRecord(changeType, row);
currentRecordNumber++;
sizeInBytes += recordByteSizes;
}
@@ -153,11 +127,9 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
if (aborted) {
throw new IllegalStateException("Attempting to build an aborted
record batch");
}
-
if (builtBuffer != null) {
return builtBuffer;
}
-
writeBatchHeader();
builtBuffer =
MultiBytesView.builder()
@@ -198,9 +170,10 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
public void close() throws IOException {
if (aborted) {
throw new IllegalStateException(
- "Cannot close MemoryLogRecordsIndexedBuilder as it has
already been aborted");
+ "Cannot close "
+ + getClass().getSimpleName()
+ + " as it has already been aborted");
}
-
isClosed = true;
}
@@ -238,7 +211,7 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
if (currentRecordNumber > 0) {
outputView.writeInt(currentRecordNumber - 1);
} else {
- // If there is no record, we write 0 for filed lastOffsetDelta,
see the comments about
+ // If there is no record, we write 0 for field lastOffsetDelta,
see the comments about
// the field 'lastOffsetDelta' in DefaultLogRecordBatch.
outputView.writeInt(0);
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
b/fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java
similarity index 60%
copy from
fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
copy to
fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java
index 88bc6a3c7..fb7f2f314 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/CompactedLogRecord.java
@@ -23,8 +23,9 @@ import org.apache.fluss.memory.OutputView;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.indexed.IndexedRow;
-import org.apache.fluss.row.indexed.IndexedRowWriter;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.compacted.CompactedRowDeserializer;
+import org.apache.fluss.row.compacted.CompactedRowWriter;
import org.apache.fluss.types.DataType;
import org.apache.fluss.utils.MurmurHashUtils;
@@ -32,30 +33,30 @@ import java.io.IOException;
import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
-/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
- * additional information regarding copyright ownership. */
-
/**
- * This class is an immutable log record and can be directly persisted. The
schema is as follows:
+ * An immutable log record for {@link CompactedRow} which can be directly
persisted. The on-wire
+ * schema is identical to IndexedLogRecord but the row payload uses the
CompactedRow binary format:
*
* <ul>
- * <li>Length => int32
- * <li>Attributes => Int8
- * <li>Value => {@link InternalRow}
+ * <li>Length => int32 (total number of bytes following this length field)
+ * <li>Attributes => int8 (low 4 bits encode {@link ChangeType})
+ * <li>Value => {@link CompactedRow} (bytes in compacted row format)
* </ul>
*
- * <p>The current record attributes are depicted below:
- *
- * <p>----------- | ChangeType (0-3) | Unused (4-7) |---------------
+ * <p>Differences vs {@link IndexedLogRecord}: - Uses CompactedRow encoding
which is space-optimized
+ * (VLQ for ints/longs, per-row null bitset) and trades CPU for smaller
storage; random access to
+ * fields is not supported without decoding. - Deserialization is lazy: we
wrap the underlying bytes
+ * in a CompactedRow with a {@link CompactedRowDeserializer} and only decode
to object values when a
+ * field is accessed. - The record header (Length + Attributes) layout and
attribute semantics are
+ * the same.
*
- * <p>The offset compute the difference relative to the base offset and of the
batch that this
- * record is contained in.
+ * <p>The offset computes the difference relative to the base offset of the
batch containing this
+ * record.
*
- * @since 0.1
+ * @since 0.8
*/
@PublicEvolving
-public class IndexedLogRecord implements LogRecord {
+public class CompactedLogRecord implements LogRecord {
private static final int ATTRIBUTES_LENGTH = 1;
@@ -67,13 +68,13 @@ public class IndexedLogRecord implements LogRecord {
private int offset;
private int sizeInBytes;
- IndexedLogRecord(long logOffset, long timestamp, DataType[] fieldTypes) {
+ CompactedLogRecord(long logOffset, long timestamp, DataType[] fieldTypes) {
this.logOffset = logOffset;
- this.fieldTypes = fieldTypes;
this.timestamp = timestamp;
+ this.fieldTypes = fieldTypes;
}
- void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
+ private void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
this.segment = segment;
this.offset = offset;
this.sizeInBytes = sizeInBytes;
@@ -88,12 +89,10 @@ public class IndexedLogRecord implements LogRecord {
if (this == o) {
return true;
}
-
if (o == null || getClass() != o.getClass()) {
return false;
}
-
- IndexedLogRecord that = (IndexedLogRecord) o;
+ CompactedLogRecord that = (CompactedLogRecord) o;
return sizeInBytes == that.sizeInBytes
&& segment.equalTo(that.segment, offset, that.offset,
sizeInBytes);
}
@@ -122,41 +121,35 @@ public class IndexedLogRecord implements LogRecord {
@Override
public InternalRow getRow() {
int rowOffset = LENGTH_LENGTH + ATTRIBUTES_LENGTH;
- // TODO currently, we only support indexed row.
- return deserializeInternalRow(
+ return LogRecord.deserializeInternalRow(
sizeInBytes - rowOffset,
segment,
offset + rowOffset,
fieldTypes,
- LogFormat.INDEXED);
+ LogFormat.COMPACTED);
}
- /** Write the record to input `target` and return its size. */
- public static int writeTo(OutputView outputView, ChangeType changeType,
IndexedRow row)
+ /** Write the record to output and return total bytes written including
length field. */
+ public static int writeTo(OutputView outputView, ChangeType changeType,
CompactedRow row)
throws IOException {
int sizeInBytes = calculateSizeInBytes(row);
-
- // TODO using varint instead int to reduce storage size.
- // write record total bytes size.
+ // write record total bytes size (excluding this int itself)
outputView.writeInt(sizeInBytes);
-
- // write attributes.
+ // write attributes
outputView.writeByte(changeType.toByteValue());
-
- // write internal row.
- serializeInternalRow(outputView, row);
-
+ // write row payload
+ CompactedRowWriter.serializeCompactedRow(row, outputView);
return sizeInBytes + LENGTH_LENGTH;
}
- public static IndexedLogRecord readFrom(
+ public static CompactedLogRecord readFrom(
MemorySegment segment,
int position,
long logOffset,
long logTimestamp,
DataType[] colTypes) {
int sizeInBytes = segment.getInt(position);
- IndexedLogRecord logRecord = new IndexedLogRecord(logOffset,
logTimestamp, colTypes);
+ CompactedLogRecord logRecord = new CompactedLogRecord(logOffset,
logTimestamp, colTypes);
logRecord.pointTo(segment, position, sizeInBytes + LENGTH_LENGTH);
return logRecord;
}
@@ -167,29 +160,8 @@ public class IndexedLogRecord implements LogRecord {
}
private static int calculateSizeInBytes(BinaryRow row) {
- int size = 1; // always one byte for attributes
+ int size = 1; // one byte for attributes
size += row.getSizeInBytes();
return size;
}
-
- private static void serializeInternalRow(OutputView outputView, IndexedRow
row)
- throws IOException {
- IndexedRowWriter.serializeIndexedRow(row, outputView);
- }
-
- private static InternalRow deserializeInternalRow(
- int length,
- MemorySegment segment,
- int position,
- DataType[] fieldTypes,
- LogFormat logFormat) {
- if (logFormat == LogFormat.INDEXED) {
- IndexedRow indexedRow = new IndexedRow(fieldTypes);
- indexedRow.pointTo(segment, position, length);
- return indexedRow;
- } else {
- throw new IllegalArgumentException(
- "No such internal row deserializer for: " + logFormat);
- }
- }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
index 88cba5497..188a62bbd 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
@@ -233,6 +233,8 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
case INDEXED:
return rowRecordIterator(
rowType, context.getOutputProjectedRow(schemaId),
timestamp);
+ case COMPACTED:
+ return compactedRowRecordIterator(rowType, timestamp);
default:
throw new IllegalArgumentException("Unsupported log format: "
+ logFormat);
}
@@ -295,6 +297,33 @@ public class DefaultLogRecordBatch implements
LogRecordBatch {
};
}
+ private CloseableIterator<LogRecord> compactedRowRecordIterator(
+ RowType rowType, long timestamp) {
+ DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
+ return new LogRecordIterator() {
+ int position = DefaultLogRecordBatch.this.position +
recordBatchHeaderSize(magic);
+ int rowId = 0;
+
+ @Override
+ protected LogRecord readNext(long baseOffset) {
+ CompactedLogRecord logRecord =
+ CompactedLogRecord.readFrom(
+ segment, position, baseOffset + rowId,
timestamp, fieldTypes);
+ rowId++;
+ position += logRecord.getSizeInBytes();
+ return logRecord;
+ }
+
+ @Override
+ protected boolean ensureNoneRemaining() {
+ return true;
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
private CloseableIterator<LogRecord> columnRecordIterator(
RowType rowType,
@Nullable ProjectedRow outputProjection,
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
index 88bc6a3c7..cf2abd90d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java
@@ -122,8 +122,7 @@ public class IndexedLogRecord implements LogRecord {
@Override
public InternalRow getRow() {
int rowOffset = LENGTH_LENGTH + ATTRIBUTES_LENGTH;
- // TODO currently, we only support indexed row.
- return deserializeInternalRow(
+ return LogRecord.deserializeInternalRow(
sizeInBytes - rowOffset,
segment,
offset + rowOffset,
@@ -176,20 +175,4 @@ public class IndexedLogRecord implements LogRecord {
throws IOException {
IndexedRowWriter.serializeIndexedRow(row, outputView);
}
-
- private static InternalRow deserializeInternalRow(
- int length,
- MemorySegment segment,
- int position,
- DataType[] fieldTypes,
- LogFormat logFormat) {
- if (logFormat == LogFormat.INDEXED) {
- IndexedRow indexedRow = new IndexedRow(fieldTypes);
- indexedRow.pointTo(segment, position, length);
- return indexedRow;
- } else {
- throw new IllegalArgumentException(
- "No such internal row deserializer for: " + logFormat);
- }
- }
}
diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecord.java
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecord.java
index ceef8e6ed..34df26ebb 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecord.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecord.java
@@ -18,7 +18,13 @@
package org.apache.fluss.record;
import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.compacted.CompactedRowDeserializer;
+import org.apache.fluss.row.indexed.IndexedRow;
+import org.apache.fluss.types.DataType;
/**
* A log record is a tuple consisting of a unique offset in the log, a
changeType and a row.
@@ -55,4 +61,26 @@ public interface LogRecord {
* @return the log record's row
*/
InternalRow getRow();
+
+ /** Deserialize the row in the log record according to given log format. */
+ static InternalRow deserializeInternalRow(
+ int length,
+ MemorySegment segment,
+ int position,
+ DataType[] fieldTypes,
+ LogFormat logFormat) {
+ if (logFormat == LogFormat.INDEXED) {
+ IndexedRow indexedRow = new IndexedRow(fieldTypes);
+ indexedRow.pointTo(segment, position, length);
+ return indexedRow;
+ } else if (logFormat == LogFormat.COMPACTED) {
+ CompactedRow compactedRow =
+ new CompactedRow(fieldTypes.length, new
CompactedRowDeserializer(fieldTypes));
+ compactedRow.pointTo(segment, position, length);
+ return compactedRow;
+ } else {
+ throw new IllegalArgumentException(
+ "No such internal row deserializer for: " + logFormat);
+ }
+ }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 7289d2d67..6d3ee99af 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -103,6 +103,9 @@ public class LogRecordReadContext implements
LogRecordBatch.ReadContext, AutoClo
} else if (logFormat == LogFormat.INDEXED) {
int[] selectedFields = projection.getProjectionPositions();
return createIndexedReadContext(rowType, schemaId, selectedFields,
schemaGetter);
+ } else if (logFormat == LogFormat.COMPACTED) {
+ int[] selectedFields = projection.getProjectionPositions();
+ return createCompactedRowReadContext(rowType, schemaId,
selectedFields);
} else {
throw new IllegalArgumentException("Unsupported log format: " +
logFormat);
}
@@ -156,6 +159,13 @@ public class LogRecordReadContext implements
LogRecordBatch.ReadContext, AutoClo
return createIndexedReadContext(rowType, schemaId, selectedFields,
schemaGetter);
}
+ /** Creates a LogRecordReadContext for COMPACTED log format. */
+ public static LogRecordReadContext createCompactedRowReadContext(
+ RowType rowType, int schemaId) {
+ int[] selectedFields = IntStream.range(0,
rowType.getFieldCount()).toArray();
+ return createCompactedRowReadContext(rowType, schemaId,
selectedFields);
+ }
+
/**
* Creates a LogRecordReadContext for INDEXED log format.
*
@@ -172,6 +182,21 @@ public class LogRecordReadContext implements
LogRecordBatch.ReadContext, AutoClo
LogFormat.INDEXED, rowType, schemaId, null, fieldGetters,
false, schemaGetter);
}
+ /**
+ * Creates a LogRecordReadContext for COMPACTED log format.
+ *
+ * @param rowType the schema of the read data
+ * @param schemaId the schemaId of the table
+ * @param selectedFields the final selected fields of the read data
+ */
+ public static LogRecordReadContext createCompactedRowReadContext(
+ RowType rowType, int schemaId, int[] selectedFields) {
+ FieldGetter[] fieldGetters = buildProjectedFieldGetters(rowType,
selectedFields);
+ // for COMPACTED log format, the projection is NEVER push downed to
the server side
+ return new LogRecordReadContext(
+ LogFormat.COMPACTED, rowType, schemaId, null, fieldGetters,
false, null);
+ }
+
private LogRecordReadContext(
LogFormat logFormat,
RowType targetDataRowType,
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
index 94b2fe59f..6624f14bd 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java
@@ -279,7 +279,7 @@ public class MemoryLogRecordsArrowBuilder implements
AutoCloseable {
if (recordCount > 0) {
outputView.writeInt(recordCount - 1);
} else {
- // If there is no record, we write 0 for filed lastOffsetDelta,
see the comments about
+ // If there is no record, we write 0 for field lastOffsetDelta,
see the comments about
// the field 'lastOffsetDelta' in DefaultLogRecordBatch.
outputView.writeInt(0);
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
new file mode 100644
index 000000000..e23aa5cc3
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.memory.AbstractPagedOutputView;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.row.compacted.CompactedRow;
+
+import java.io.IOException;
+
+import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
+
+/**
+ * Default builder for {@link MemoryLogRecords} of log records in {@link
LogFormat#COMPACTED}
+ * format.
+ */
+public class MemoryLogRecordsCompactedBuilder
+ extends AbstractRowMemoryLogRecordsBuilder<CompactedRow> {
+
+ private MemoryLogRecordsCompactedBuilder(
+ long baseLogOffset,
+ int schemaId,
+ int writeLimit,
+ byte magic,
+ AbstractPagedOutputView pagedOutputView,
+ boolean appendOnly) {
+ super(baseLogOffset, schemaId, writeLimit, magic, pagedOutputView,
appendOnly);
+ }
+
+ public static MemoryLogRecordsCompactedBuilder builder(
+ int schemaId, int writeLimit, AbstractPagedOutputView outputView,
boolean appendOnly) {
+ return new MemoryLogRecordsCompactedBuilder(
+ BUILDER_DEFAULT_OFFSET,
+ schemaId,
+ writeLimit,
+ CURRENT_LOG_MAGIC_VALUE,
+ outputView,
+ appendOnly);
+ }
+
+ @VisibleForTesting
+ public static MemoryLogRecordsCompactedBuilder builder(
+ long baseLogOffset,
+ int schemaId,
+ int writeLimit,
+ byte magic,
+ AbstractPagedOutputView outputView)
+ throws IOException {
+ return new MemoryLogRecordsCompactedBuilder(
+ baseLogOffset, schemaId, writeLimit, magic, outputView, false);
+ }
+
+ @Override
+ protected int sizeOf(CompactedRow row) {
+ return CompactedLogRecord.sizeOf(row);
+ }
+
+ @Override
+ protected int writeRecord(ChangeType changeType, CompactedRow row) throws
IOException {
+ return CompactedLogRecord.writeTo(pagedOutputView, changeType, row);
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
index 8488c3351..a713c34ee 100644
---
a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
+++
b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java
@@ -19,51 +19,17 @@ package org.apache.fluss.record;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.memory.AbstractPagedOutputView;
-import org.apache.fluss.memory.MemorySegment;
-import org.apache.fluss.memory.MemorySegmentOutputView;
-import org.apache.fluss.metadata.LogFormat;
-import org.apache.fluss.record.bytesview.BytesView;
-import org.apache.fluss.record.bytesview.MultiBytesView;
import org.apache.fluss.row.indexed.IndexedRow;
-import org.apache.fluss.utils.crc.Crc32C;
import java.io.IOException;
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
-import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH;
-import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH;
-import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
-import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
-import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH;
-import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
-import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset;
-import static
org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset;
-import static
org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
-import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset;
-import static org.apache.fluss.utils.Preconditions.checkArgument;
/**
- * Default builder for {@link MemoryLogRecords} of log records in {@link
LogFormat#INDEXED} format.
+ * Default builder for {@link MemoryLogRecords} of log records in {@link
+ * org.apache.fluss.metadata.LogFormat#INDEXED} format.
*/
-public class MemoryLogRecordsIndexedBuilder implements AutoCloseable {
- private static final int BUILDER_DEFAULT_OFFSET = 0;
-
- private final long baseLogOffset;
- private final int schemaId;
- // The max bytes can be appended.
- private final int writeLimit;
- private final byte magic;
- private final AbstractPagedOutputView pagedOutputView;
- private final MemorySegment firstSegment;
- private final boolean appendOnly;
-
- private BytesView builtBuffer = null;
- private long writerId;
- private int batchSequence;
- private int currentRecordNumber;
- private int sizeInBytes;
- private volatile boolean isClosed;
- private boolean aborted = false;
+public class MemoryLogRecordsIndexedBuilder extends
AbstractRowMemoryLogRecordsBuilder<IndexedRow> {
private MemoryLogRecordsIndexedBuilder(
long baseLogOffset,
@@ -72,25 +38,7 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
byte magic,
AbstractPagedOutputView pagedOutputView,
boolean appendOnly) {
- this.appendOnly = appendOnly;
- checkArgument(
- schemaId <= Short.MAX_VALUE,
- "schemaId shouldn't be greater than the max value of short: "
+ Short.MAX_VALUE);
- this.baseLogOffset = baseLogOffset;
- this.schemaId = schemaId;
- this.writeLimit = writeLimit;
- this.magic = magic;
- this.pagedOutputView = pagedOutputView;
- this.firstSegment = pagedOutputView.getCurrentSegment();
- this.writerId = NO_WRITER_ID;
- this.batchSequence = NO_BATCH_SEQUENCE;
- this.currentRecordNumber = 0;
- this.isClosed = false;
-
- // We don't need to write header information while the builder
creating,
- // we'll skip it first.
- this.pagedOutputView.setPosition(recordBatchHeaderSize(magic));
- this.sizeInBytes = recordBatchHeaderSize(magic);
+ super(baseLogOffset, schemaId, writeLimit, magic, pagedOutputView,
appendOnly);
}
public static MemoryLogRecordsIndexedBuilder builder(
@@ -116,139 +64,13 @@ public class MemoryLogRecordsIndexedBuilder implements
AutoCloseable {
baseLogOffset, schemaId, writeLimit, magic, outputView, false);
}
- /**
- * Check if we have room for a new record containing the given row. If no
records have been
- * appended, then this returns true.
- */
- public boolean hasRoomFor(IndexedRow row) {
- return sizeInBytes + IndexedLogRecord.sizeOf(row) <= writeLimit;
- }
-
- public void append(ChangeType changeType, IndexedRow row) throws Exception
{
- appendRecord(changeType, row);
- }
-
- private void appendRecord(ChangeType changeType, IndexedRow row) throws
IOException {
- if (aborted) {
- throw new IllegalStateException(
- "Tried to append a record, but
MemoryLogRecordsIndexedBuilder has already been aborted");
- }
-
- if (isClosed) {
- throw new IllegalStateException(
- "Tried to append a record, but MemoryLogRecordsBuilder is
closed for record appends");
- }
- if (appendOnly && changeType != ChangeType.APPEND_ONLY) {
- throw new IllegalArgumentException(
- "Only append-only change type is allowed for append-only
arrow log builder, but got "
- + changeType);
- }
-
- int recordByteSizes = IndexedLogRecord.writeTo(pagedOutputView,
changeType, row);
- currentRecordNumber++;
- sizeInBytes += recordByteSizes;
- }
-
- public BytesView build() throws IOException {
- if (aborted) {
- throw new IllegalStateException("Attempting to build an aborted
record batch");
- }
-
- if (builtBuffer != null) {
- return builtBuffer;
- }
-
- writeBatchHeader();
- builtBuffer =
- MultiBytesView.builder()
-
.addMemorySegmentByteViewList(pagedOutputView.getWrittenSegments())
- .build();
- return builtBuffer;
- }
-
- public void setWriterState(long writerId, int batchBaseSequence) {
- this.writerId = writerId;
- this.batchSequence = batchBaseSequence;
- }
-
- public void resetWriterState(long writerId, int batchSequence) {
- // trigger to rewrite batch header
- this.builtBuffer = null;
- this.writerId = writerId;
- this.batchSequence = batchSequence;
- }
-
- public long writerId() {
- return writerId;
- }
-
- public int batchSequence() {
- return batchSequence;
- }
-
- public boolean isClosed() {
- return isClosed;
- }
-
- public void abort() {
- aborted = true;
- }
-
@Override
- public void close() throws IOException {
- if (aborted) {
- throw new IllegalStateException(
- "Cannot close MemoryLogRecordsIndexedBuilder as it has
already been aborted");
- }
-
- isClosed = true;
- }
-
- public int getSizeInBytes() {
- return sizeInBytes;
+ protected int sizeOf(IndexedRow row) {
+ return IndexedLogRecord.sizeOf(row);
}
- // ----------------------- internal methods -------------------------------
- private void writeBatchHeader() throws IOException {
- // pagedOutputView doesn't support seek to previous segment,
- // so we create a new output view on the first segment
- MemorySegmentOutputView outputView = new
MemorySegmentOutputView(firstSegment);
- outputView.setPosition(0);
- // update header.
- outputView.writeLong(baseLogOffset);
- outputView.writeInt(sizeInBytes - BASE_OFFSET_LENGTH - LENGTH_LENGTH);
- outputView.writeByte(magic);
-
- // write empty timestamp which will be overridden on server side
- outputView.writeLong(0);
-
- // write empty leaderEpoch which will be overridden on server side
- if (magic >= LOG_MAGIC_VALUE_V1) {
- outputView.writeInt(NO_LEADER_EPOCH);
- }
-
- // write empty crc first.
- outputView.writeUnsignedInt(0);
-
- outputView.writeShort((short) schemaId);
- // write attributes (currently only appendOnly flag)
- outputView.writeBoolean(appendOnly);
- // skip write attribute byte for now.
- outputView.setPosition(lastOffsetDeltaOffset(magic));
- if (currentRecordNumber > 0) {
- outputView.writeInt(currentRecordNumber - 1);
- } else {
- // If there is no record, we write 0 for filed lastOffsetDelta,
see the comments about
- // the field 'lastOffsetDelta' in DefaultLogRecordBatch.
- outputView.writeInt(0);
- }
- outputView.writeLong(writerId);
- outputView.writeInt(batchSequence);
- outputView.writeInt(currentRecordNumber);
-
- // Update crc.
- long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(),
schemaIdOffset(magic));
- outputView.setPosition(crcOffset(magic));
- outputView.writeUnsignedInt(crc);
+ @Override
+ protected int writeRecord(ChangeType changeType, IndexedRow row) throws
IOException {
+ return IndexedLogRecord.writeTo(pagedOutputView, changeType, row);
}
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java
new file mode 100644
index 000000000..cfec405f1
--- /dev/null
+++
b/fluss-common/src/test/java/org/apache/fluss/record/CompactedLogRecordTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.TestInternalRowGenerator;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.compacted.CompactedRowDeserializer;
+import org.apache.fluss.row.compacted.CompactedRowWriter;
+import org.apache.fluss.types.DataType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompactedLogRecord}. */
+class CompactedLogRecordTest extends LogTestBase {
+
+ @Test
+ void testBase() throws IOException {
+ DataType[] fieldTypes = baseRowType.getChildren().toArray(new
DataType[0]);
+
+ CompactedRowWriter writer = new CompactedRowWriter(fieldTypes.length);
+ // field 0: int 10
+ writer.writeInt(10);
+ // field 1: string "abc"
+ writer.writeString(BinaryString.fromString("abc"));
+ byte[] bytes = writer.toBytes();
+ CompactedRow row =
+ CompactedRow.from(fieldTypes, bytes, new
CompactedRowDeserializer(fieldTypes));
+
+ CompactedLogRecord.writeTo(outputView, ChangeType.APPEND_ONLY, row);
+
+ CompactedLogRecord logRecord =
+ CompactedLogRecord.readFrom(
+ MemorySegment.wrap(outputView.getCopyOfBuffer()),
+ 0,
+ 1000,
+ 10001,
+ fieldTypes);
+
+ assertThat(logRecord.getSizeInBytes()).isEqualTo(1 +
row.getSizeInBytes() + 4);
+ assertThat(logRecord.logOffset()).isEqualTo(1000);
+ assertThat(logRecord.timestamp()).isEqualTo(10001);
+
assertThat(logRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+ assertThat(logRecord.getRow()).isEqualTo(row);
+ }
+
+ @Test
+ void testWriteToAndReadFromWithRandomData() throws IOException {
+ // generate a compacted row for all supported types
+ DataType[] allColTypes =
+
TestInternalRowGenerator.createAllRowType().getChildren().toArray(new
DataType[0]);
+ CompactedRow row =
TestInternalRowGenerator.genCompactedRowForAllType();
+
+ CompactedLogRecord.writeTo(outputView, ChangeType.APPEND_ONLY, row);
+
+ LogRecord logRecord =
+ CompactedLogRecord.readFrom(
+ MemorySegment.wrap(outputView.getCopyOfBuffer()),
+ 0,
+ 1000,
+ 10001,
+ allColTypes);
+
+ assertThat(logRecord.logOffset()).isEqualTo(1000);
+ assertThat(logRecord.timestamp()).isEqualTo(10001);
+
assertThat(logRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
+ assertThat(logRecord.getRow()).isEqualTo(row);
+ }
+}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java
b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java
new file mode 100644
index 000000000..2a8d70c80
--- /dev/null
+++
b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsCompactedBuilderTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.fluss.memory.ManagedPagedOutputView;
+import org.apache.fluss.memory.TestingMemorySegmentPool;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.fluss.record.ChangeType.APPEND_ONLY;
+import static org.apache.fluss.record.TestData.BASE_OFFSET;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DEFAULT_MAGIC;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link MemoryLogRecordsCompactedBuilder}. */
+class MemoryLogRecordsCompactedBuilderTest {
+
+ @Test
+ void testAppendAndBuild() throws Exception {
+ MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 4, 1024);
+
+ List<CompactedRow> expected = new ArrayList<>();
+ expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}));
+ expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {2, "b"}));
+ expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {3, "c"}));
+
+ for (CompactedRow row : expected) {
+ assertThat(builder.hasRoomFor(row)).isTrue();
+ builder.append(APPEND_ONLY, row);
+ }
+
+ builder.setWriterState(7L, 13);
+ builder.close();
+ MemoryLogRecords records =
MemoryLogRecords.pointToBytesView(builder.build());
+
+ Iterator<LogRecordBatch> it = records.batches().iterator();
+ assertThat(it.hasNext()).isTrue();
+ LogRecordBatch batch = it.next();
+ assertThat(it.hasNext()).isFalse();
+
+ assertThat(batch.getRecordCount()).isEqualTo(expected.size());
+ assertThat(batch.baseLogOffset()).isEqualTo(0);
+ assertThat(batch.writerId()).isEqualTo(7L);
+ assertThat(batch.batchSequence()).isEqualTo(13);
+
+ try (LogRecordReadContext ctx =
+ LogRecordReadContext.createCompactedRowReadContext(
+ DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
+ CloseableIterator<LogRecord> recIt = batch.records(ctx)) {
+ for (CompactedRow expRow : expected) {
+ assertThat(recIt.hasNext()).isTrue();
+ LogRecord rec = recIt.next();
+ assertThat(rec.getChangeType()).isEqualTo(APPEND_ONLY);
+ assertThat(rec.getRow()).isEqualTo(expRow);
+ }
+ assertThat(recIt.hasNext()).isFalse();
+ }
+ }
+
+ @Test
+ void testAbortSemantics() throws Exception {
+ MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 2, 512);
+ builder.append(APPEND_ONLY, compactedRow(DATA1_ROW_TYPE, new Object[]
{1, "a"}));
+ builder.abort();
+
+ // append after abort
+ assertThatThrownBy(
+ () ->
+ builder.append(
+ APPEND_ONLY,
+ compactedRow(DATA1_ROW_TYPE, new
Object[] {2, "b"})))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("MemoryLogRecordsCompactedBuilder has
already been aborted");
+
+ // build after abort
+ assertThatThrownBy(builder::build)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Attempting to build an aborted record
batch");
+
+ // close after abort
+ assertThatThrownBy(builder::close)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Cannot close MemoryLogRecordsCompactedBuilder as it
has already been aborted");
+ }
+
+ @Test
+ void testNoRecordAppendAndBaseOffset() throws Exception {
+ // base offset 0
+ try (MemoryLogRecordsCompactedBuilder builder = createBuilder(0, 1,
1024)) {
+ MemoryLogRecords records =
MemoryLogRecords.pointToBytesView(builder.build());
+ assertThat(records.sizeInBytes())
+ .isEqualTo(
+ LogRecordBatchFormat.recordBatchHeaderSize(
+ DEFAULT_MAGIC)); // only batch header
+ LogRecordBatch batch = records.batches().iterator().next();
+ batch.ensureValid();
+ assertThat(batch.getRecordCount()).isEqualTo(0);
+ assertThat(batch.baseLogOffset()).isEqualTo(0);
+ assertThat(batch.lastLogOffset()).isEqualTo(0);
+ assertThat(batch.nextLogOffset()).isEqualTo(1);
+ try (LogRecordReadContext ctx =
+ LogRecordReadContext.createCompactedRowReadContext(
+ DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
+ CloseableIterator<LogRecord> it = batch.records(ctx)) {
+ assertThat(it.hasNext()).isFalse();
+ }
+ }
+
+ // base offset 100
+ try (MemoryLogRecordsCompactedBuilder builder = createBuilder(100, 1,
1024)) {
+ MemoryLogRecords records =
MemoryLogRecords.pointToBytesView(builder.build());
+ assertThat(records.sizeInBytes())
+
.isEqualTo(LogRecordBatchFormat.recordBatchHeaderSize(DEFAULT_MAGIC));
+ LogRecordBatch batch = records.batches().iterator().next();
+ batch.ensureValid();
+ assertThat(batch.getRecordCount()).isEqualTo(0);
+ assertThat(batch.baseLogOffset()).isEqualTo(100);
+ assertThat(batch.lastLogOffset()).isEqualTo(100);
+ assertThat(batch.nextLogOffset()).isEqualTo(101);
+ try (LogRecordReadContext ctx =
+ LogRecordReadContext.createCompactedRowReadContext(
+ DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID);
+ CloseableIterator<LogRecord> it = batch.records(ctx)) {
+ assertThat(it.hasNext()).isFalse();
+ }
+ }
+ }
+
+ @Test
+ void testResetWriterState() throws Exception {
+ MemoryLogRecordsCompactedBuilder builder = createBuilder(BASE_OFFSET,
2, 1024);
+ List<CompactedRow> expected = new ArrayList<>();
+ expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}));
+ expected.add(compactedRow(DATA1_ROW_TYPE, new Object[] {2, "b"}));
+ for (CompactedRow row : expected) {
+ builder.append(APPEND_ONLY, row);
+ }
+ builder.setWriterState(5L, 0);
+ builder.close();
+ MemoryLogRecords records =
MemoryLogRecords.pointToBytesView(builder.build());
+ LogRecordBatch batch = records.batches().iterator().next();
+ assertThat(batch.writerId()).isEqualTo(5L);
+ assertThat(batch.batchSequence()).isEqualTo(0);
+
+ // reset writer state and rebuild with new sequence
+ builder.resetWriterState(5L, 1);
+ records = MemoryLogRecords.pointToBytesView(builder.build());
+ batch = records.batches().iterator().next();
+ assertThat(batch.writerId()).isEqualTo(5L);
+ assertThat(batch.batchSequence()).isEqualTo(1);
+ }
+
+ private MemoryLogRecordsCompactedBuilder createBuilder(
+ long baseOffset, int maxPages, int pageSizeInBytes) throws
IOException {
+ return MemoryLogRecordsCompactedBuilder.builder(
+ baseOffset,
+ DEFAULT_SCHEMA_ID,
+ maxPages * pageSizeInBytes,
+ DEFAULT_MAGIC,
+ new ManagedPagedOutputView(new
TestingMemorySegmentPool(pageSizeInBytes)));
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
index 96bb0eba8..29cbfe5e1 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
@@ -55,6 +56,7 @@ public class KvRecoverHelper {
private final long recoverPointOffset;
private final KvRecoverContext recoverContext;
private final KvFormat kvFormat;
+ private final LogFormat logFormat;
// will be initialized when first encounter a log record during recovering
from log
private Integer currentSchemaId;
@@ -72,12 +74,14 @@ public class KvRecoverHelper {
long recoverPointOffset,
KvRecoverContext recoverContext,
KvFormat kvFormat,
+ LogFormat logFormat,
SchemaGetter schemaGetter) {
this.kvTablet = kvTablet;
this.logTablet = logTablet;
this.recoverPointOffset = recoverPointOffset;
this.recoverContext = recoverContext;
this.kvFormat = kvFormat;
+ this.logFormat = logFormat;
this.schemaGetter = schemaGetter;
}
@@ -127,9 +131,7 @@ public class KvRecoverHelper {
FetchIsolation fetchIsolation,
ThrowingConsumer<KeyValueAndLogOffset, Exception>
resumeRecordConsumer)
throws Exception {
- try (LogRecordReadContext readContext =
- LogRecordReadContext.createArrowReadContext(
- currentRowType, currentSchemaId, schemaGetter)) {
+ try (LogRecordReadContext readContext = createLogRecordReadContext()) {
long nextFetchOffset = startFetchOffset;
while (true) {
LogRecords logRecords =
@@ -175,6 +177,18 @@ public class KvRecoverHelper {
}
}
+ private LogRecordReadContext createLogRecordReadContext() {
+ if (logFormat == LogFormat.ARROW) {
+ return LogRecordReadContext.createArrowReadContext(
+ currentRowType, currentSchemaId, schemaGetter);
+ } else if (logFormat == LogFormat.COMPACTED) {
+ return LogRecordReadContext.createCompactedRowReadContext(
+ currentRowType, currentSchemaId);
+ } else {
+ throw new UnsupportedOperationException("Unsupported log format: "
+ logFormat);
+ }
+ }
+
// TODO: this is very in-efficient, because the conversion is CPU heavy.
Should be optimized in
// the future.
private BinaryRow toKvRow(InternalRow originalRow) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index b6957c5dc..e6542d8f4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -54,6 +54,7 @@ import
org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
import org.apache.fluss.server.kv.snapshot.KvSnapshotDataUploader;
import org.apache.fluss.server.kv.snapshot.RocksIncrementalSnapshot;
import org.apache.fluss.server.kv.wal.ArrowWalBuilder;
+import org.apache.fluss.server.kv.wal.CompactedWalBuilder;
import org.apache.fluss.server.kv.wal.IndexWalBuilder;
import org.apache.fluss.server.kv.wal.WalBuilder;
import org.apache.fluss.server.log.LogAppendInfo;
@@ -442,6 +443,8 @@ public final class KvTablet {
"Primary Key Table with COMPACTED kv format
doesn't support INDEXED cdc log format.");
}
return new IndexWalBuilder(schemaId, memorySegmentPool);
+ case COMPACTED:
+ return new CompactedWalBuilder(schemaId, rowType,
memorySegmentPool);
case ARROW:
return new ArrowWalBuilder(
schemaId,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java
new file mode 100644
index 000000000..8c80a48d9
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/wal/CompactedWalBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.wal;
+
+import org.apache.fluss.memory.ManagedPagedOutputView;
+import org.apache.fluss.memory.MemorySegmentPool;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.record.MemoryLogRecordsCompactedBuilder;
+import org.apache.fluss.record.bytesview.BytesView;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.InternalRow.FieldGetter;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.encode.CompactedRowEncoder;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+import java.io.IOException;
+
+/** A {@link WalBuilder} that builds a {@link MemoryLogRecords} with Compacted
log format. */
+public class CompactedWalBuilder implements WalBuilder {
+
+ private final MemorySegmentPool memorySegmentPool;
+ private final ManagedPagedOutputView outputView;
+ private final MemoryLogRecordsCompactedBuilder recordsBuilder;
+
+ private final CompactedRowEncoder rowEncoder;
+ private final FieldGetter[] fieldGetters;
+ private final int fieldCount;
+
+ public CompactedWalBuilder(int schemaId, RowType rowType,
MemorySegmentPool memorySegmentPool)
+ throws IOException {
+ this.memorySegmentPool = memorySegmentPool;
+ this.outputView = new ManagedPagedOutputView(memorySegmentPool);
+ // unlimited write size as we don't know the WAL size in advance
+ this.recordsBuilder =
+ MemoryLogRecordsCompactedBuilder.builder(
+ schemaId, Integer.MAX_VALUE, outputView,
/*appendOnly*/ false);
+ DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
+ this.rowEncoder = new CompactedRowEncoder(fieldTypes);
+ this.fieldGetters = InternalRow.createFieldGetters(rowType);
+ this.fieldCount = rowType.getFieldCount();
+ }
+
+ @Override
+ public void append(ChangeType changeType, InternalRow row) throws
Exception {
+ final CompactedRow compactedRow;
+ if (row instanceof CompactedRow) {
+ compactedRow = (CompactedRow) row;
+ } else {
+ rowEncoder.startNewRow();
+ for (int i = 0; i < fieldCount; i++) {
+ rowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(row));
+ }
+ compactedRow = rowEncoder.finishRow();
+ }
+ recordsBuilder.append(changeType, compactedRow);
+ }
+
+ @Override
+ public MemoryLogRecords build() throws Exception {
+ recordsBuilder.close();
+ BytesView bytesView = recordsBuilder.build();
+ // Convert BytesView to MemoryLogRecords (may copy if composite)
+ return
MemoryLogRecords.pointToByteBuffer(bytesView.getByteBuf().nioBuffer());
+ }
+
+ @Override
+ public void setWriterState(long writerId, int batchSequence) {
+ recordsBuilder.setWriterState(writerId, batchSequence);
+ }
+
+ @Override
+ public void deallocate() {
+ memorySegmentPool.returnAll(outputView.allocatedPooledSegments());
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 1a4b6d906..1669e004d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -752,6 +752,7 @@ public final class Replica {
startRecoverLogOffset,
recoverContext,
tableConfig.getKvFormat(),
+ tableConfig.getLogFormat(),
schemaGetter);
kvRecoverHelper.recover();
} catch (Exception e) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 250fc30da..0941e32c1 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -221,9 +221,13 @@ public class TableDescriptorValidation {
private static void checkLogFormat(Configuration tableConf, boolean
hasPrimaryKey) {
KvFormat kvFormat = tableConf.get(ConfigOptions.TABLE_KV_FORMAT);
LogFormat logFormat = tableConf.get(ConfigOptions.TABLE_LOG_FORMAT);
- if (hasPrimaryKey && kvFormat == KvFormat.COMPACTED && logFormat !=
LogFormat.ARROW) {
+
+ // Allow COMPACTED and ARROW log formats when KV format is COMPACTED
for primary key tables
+ if (hasPrimaryKey
+ && kvFormat == KvFormat.COMPACTED
+ && !(logFormat == LogFormat.ARROW || logFormat ==
LogFormat.COMPACTED)) {
throw new InvalidConfigException(
- "Currently, Primary Key Table only supports ARROW log
format if kv format is COMPACTED.");
+ "Currently, Primary Key Table supports ARROW or COMPACTED
log format when kv format is COMPACTED.");
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
index c6adbbbb1..932c17f23 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
@@ -21,7 +21,9 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.exception.NotLeaderOrFollowerException;
+import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.KvRecord;
import org.apache.fluss.record.KvRecordBatch;
@@ -44,7 +46,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
import static
org.apache.fluss.server.testutils.KvTestUtils.assertLookupResponse;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest;
@@ -81,9 +83,14 @@ class KvReplicaRestoreITCase {
int bucketNum = 3;
List<TableBucket> tableBuckets = new ArrayList<>();
for (int i = 0; i < tableNum; i++) {
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(DATA1_SCHEMA_PK)
+ .distributedBy(3, "a")
+ .logFormat(i % 2 == 0 ? LogFormat.ARROW :
LogFormat.COMPACTED)
+ .build();
TablePath tablePath = TablePath.of("test_db", "test_table_" + i);
- long tableId =
- createTable(FLUSS_CLUSTER_EXTENSION, tablePath,
DATA1_TABLE_DESCRIPTOR_PK);
+ long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath,
tableDescriptor);
for (int bucket = 0; bucket < bucketNum; bucket++) {
tableBuckets.add(new TableBucket(tableId, bucket));
}