This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d3a935f3b [spark] support merge-read between kv snapshot and log for
primary-key table (#2523)
d3a935f3b is described below
commit d3a935f3b148a5de3adc6453207ab252bbf68aae
Author: Yann Byron <[email protected]>
AuthorDate: Sun Feb 1 15:47:43 2026 +0800
[spark] support merge-read between kv snapshot and log for primary-key
table (#2523)
---
.../client/table/scanner}/SortMergeReader.java | 97 +++-----
.../client/utils/SingleElementHeadIterator.java | 88 +++++++
.../client/table/scanner}/SortMergeReaderTest.java | 4 +-
.../java/org/apache/fluss/row}/KeyValueRow.java | 5 +-
.../reader/LakeSnapshotAndLogSplitScanner.java | 2 +
.../org/apache/fluss/spark/SparkFlussConf.scala | 9 +
.../scala/org/apache/fluss/spark/SparkTable.scala | 14 +-
.../spark/read/FlussUpsertPartitionReader.scala | 206 ++++++++++++++--
.../fluss/spark/utils/LogChangesIterator.scala | 146 ++++++++++++
.../apache/fluss/spark/FlussSparkTestBase.scala | 23 +-
...kReadTest.scala => SparkLogTableReadTest.scala} | 100 +-------
.../fluss/spark/SparkPrimaryKeyTableReadTest.scala | 262 +++++++++++++++++++++
12 files changed, 740 insertions(+), 216 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SortMergeReader.java
similarity index 85%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
rename to
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SortMergeReader.java
index cf9d22925..0be36b584 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SortMergeReader.java
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.lake.reader;
+package org.apache.fluss.client.table.scanner;
+import org.apache.fluss.client.utils.SingleElementHeadIterator;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.KeyValueRow;
import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.utils.CloseableIterator;
@@ -33,11 +35,14 @@ import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.function.Function;
-/** A sort merge reader to merge lakehouse snapshot record and fluss change
log. */
-class SortMergeReader {
+/**
+ * A sort merge reader to merge snapshot record and change log. Both of them
should have the same
+ * primary key encoding when comparing the keys.
+ */
+public class SortMergeReader {
private final ProjectedRow snapshotProjectedPkRow;
- private final CloseableIterator<LogRecord> lakeRecordIterator;
+ private final CloseableIterator<LogRecord> snapshotRecordIterator;
private final Comparator<InternalRow> userKeyComparator;
private CloseableIterator<KeyValueRow> changeLogIterator;
@@ -49,13 +54,29 @@ class SortMergeReader {
public SortMergeReader(
@Nullable int[] projectedFields,
int[] pkIndexes,
- List<CloseableIterator<LogRecord>> lakeRecordIterators,
+ @Nullable CloseableIterator<LogRecord> snapshotRecordIterator,
+ Comparator<InternalRow> userKeyComparator,
+ CloseableIterator<KeyValueRow> changeLogIterator) {
+ this(
+ projectedFields,
+ pkIndexes,
+ snapshotRecordIterator == null
+ ? Collections.emptyList()
+ : Collections.singletonList(snapshotRecordIterator),
+ userKeyComparator,
+ changeLogIterator);
+ }
+
+ public SortMergeReader(
+ @Nullable int[] projectedFields,
+ int[] pkIndexes,
+ List<CloseableIterator<LogRecord>> snapshotRecordIterators,
Comparator<InternalRow> userKeyComparator,
CloseableIterator<KeyValueRow> changeLogIterator) {
this.userKeyComparator = userKeyComparator;
this.snapshotProjectedPkRow = ProjectedRow.from(pkIndexes);
- this.lakeRecordIterator =
- ConcatRecordIterator.wrap(lakeRecordIterators,
userKeyComparator, pkIndexes);
+ this.snapshotRecordIterator =
+ ConcatRecordIterator.wrap(snapshotRecordIterators,
userKeyComparator, pkIndexes);
this.changeLogIterator = changeLogIterator;
this.changeLogIteratorWrapper = new ChangeLogIteratorWrapper();
this.snapshotMergedRowIteratorWrapper = new
SnapshotMergedRowIteratorWrapper();
@@ -65,13 +86,13 @@ class SortMergeReader {
@Nullable
public CloseableIterator<InternalRow> readBatch() {
- if (!lakeRecordIterator.hasNext()) {
+ if (!snapshotRecordIterator.hasNext()) {
return changeLogIterator.hasNext()
? changeLogIteratorWrapper.replace(changeLogIterator)
: null;
} else {
CloseableIterator<SortMergeRows> mergedRecordIterator =
- transform(lakeRecordIterator,
this::sortMergeWithChangeLog);
+ transform(snapshotRecordIterator,
this::sortMergeWithChangeLog);
return
snapshotMergedRowIteratorWrapper.replace(mergedRecordIterator);
}
@@ -227,64 +248,6 @@ class SortMergeReader {
}
}
- private static class SingleElementHeadIterator<T> implements
CloseableIterator<T> {
- private T singleElement;
- private CloseableIterator<T> inner;
- private boolean singleElementReturned;
-
- public SingleElementHeadIterator(T element, CloseableIterator<T>
inner) {
- this.singleElement = element;
- this.inner = inner;
- this.singleElementReturned = false;
- }
-
- public static <T> SingleElementHeadIterator<T> addElementToHead(
- T firstElement, CloseableIterator<T> originElementIterator) {
- if (originElementIterator instanceof SingleElementHeadIterator) {
- SingleElementHeadIterator<T> singleElementHeadIterator =
- (SingleElementHeadIterator<T>) originElementIterator;
- singleElementHeadIterator.set(firstElement,
singleElementHeadIterator.inner);
- return singleElementHeadIterator;
- } else {
- return new SingleElementHeadIterator<>(firstElement,
originElementIterator);
- }
- }
-
- public void set(T element, CloseableIterator<T> inner) {
- this.singleElement = element;
- this.inner = inner;
- this.singleElementReturned = false;
- }
-
- @Override
- public boolean hasNext() {
- return !singleElementReturned || inner.hasNext();
- }
-
- @Override
- public T next() {
- if (singleElementReturned) {
- return inner.next();
- }
- singleElementReturned = true;
- return singleElement;
- }
-
- public T peek() {
- if (singleElementReturned) {
- this.singleElement = inner.next();
- this.singleElementReturned = false;
- return this.singleElement;
- }
- return singleElement;
- }
-
- @Override
- public void close() {
- inner.close();
- }
- }
-
private static class ChangeLogIteratorWrapper implements
CloseableIterator<InternalRow> {
private CloseableIterator<KeyValueRow> changeLogRecordIterator;
private KeyValueRow nextReturnRow;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/utils/SingleElementHeadIterator.java
b/fluss-client/src/main/java/org/apache/fluss/client/utils/SingleElementHeadIterator.java
new file mode 100644
index 000000000..876e0f1fb
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/utils/SingleElementHeadIterator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.utils;
+
+import org.apache.fluss.utils.CloseableIterator;
+
+/**
+ * A {@link CloseableIterator} that wraps another iterator with a single
element prepended at the
+ * head.
+ *
+ * <p>This iterator first returns the single element, then delegates to the
inner iterator for
+ * subsequent elements. It can be reused by calling {@link #set(Object,
CloseableIterator)} to
+ * replace the head element and inner iterator.
+ */
+public class SingleElementHeadIterator<T> implements CloseableIterator<T> {
+ private T singleElement;
+
+ private CloseableIterator<T> inner;
+
+ private boolean singleElementReturned;
+
+ public SingleElementHeadIterator(T element, CloseableIterator<T> inner) {
+ this.singleElement = element;
+ this.inner = inner;
+ this.singleElementReturned = false;
+ }
+
+ public static <T> SingleElementHeadIterator<T> addElementToHead(
+ T firstElement, CloseableIterator<T> originElementIterator) {
+ if (originElementIterator instanceof SingleElementHeadIterator) {
+ SingleElementHeadIterator<T> singleElementHeadIterator =
+ (SingleElementHeadIterator<T>) originElementIterator;
+ singleElementHeadIterator.set(firstElement,
singleElementHeadIterator.inner);
+ return singleElementHeadIterator;
+ } else {
+ return new SingleElementHeadIterator<>(firstElement,
originElementIterator);
+ }
+ }
+
+ public void set(T element, CloseableIterator<T> inner) {
+ this.singleElement = element;
+ this.inner = inner;
+ this.singleElementReturned = false;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !singleElementReturned || inner.hasNext();
+ }
+
+ @Override
+ public T next() {
+ if (singleElementReturned) {
+ return inner.next();
+ }
+ singleElementReturned = true;
+ return singleElement;
+ }
+
+ public T peek() {
+ if (singleElementReturned) {
+ this.singleElement = inner.next();
+ this.singleElementReturned = false;
+ return this.singleElement;
+ }
+ return singleElement;
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/reader/SortMergeReaderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/SortMergeReaderTest.java
similarity index 98%
rename from
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/reader/SortMergeReaderTest.java
rename to
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/SortMergeReaderTest.java
index edeee5829..64bb958e5 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/reader/SortMergeReaderTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/SortMergeReaderTest.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.lake.reader;
+package org.apache.fluss.client.table.scanner;
-import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.KeyValueRow;
import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.types.IntType;
import org.apache.fluss.types.RowType;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/KeyValueRow.java
b/fluss-common/src/main/java/org/apache/fluss/row/KeyValueRow.java
similarity index 91%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/KeyValueRow.java
rename to fluss-common/src/main/java/org/apache/fluss/row/KeyValueRow.java
index f8e7bd479..4c7ed024e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/KeyValueRow.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/KeyValueRow.java
@@ -16,10 +16,7 @@
* limitations under the License.
*/
-package org.apache.fluss.flink.lake.reader;
-
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.ProjectedRow;
+package org.apache.fluss.row;
/** An {@link InternalRow} with the key part. */
public class KeyValueRow {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
index 9d3ce7379..0caf10d55 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.lake.reader;
import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.SortMergeReader;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.ScanRecords;
@@ -32,6 +33,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.KeyValueRow;
import org.apache.fluss.utils.CloseableIterator;
import javax.annotation.Nullable;
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
index 5148e1e1c..f71e2c229 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
@@ -17,6 +17,9 @@
package org.apache.fluss.spark
+import org.apache.fluss.config.ConfigBuilder.key
+import org.apache.fluss.config.ConfigOption
+
import org.apache.spark.sql.internal.SQLConf.buildConf
object SparkFlussConf {
@@ -27,4 +30,10 @@ object SparkFlussConf {
.booleanConf
.createWithDefault(false)
+ val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
+ key(READ_OPTIMIZED.key)
+ .booleanType()
+ .defaultValue(READ_OPTIMIZED.defaultValue.get)
+ .withDescription(READ_OPTIMIZED.doc)
+
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 028af2815..53ed8a091 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -20,6 +20,7 @@ package org.apache.fluss.spark
import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.spark.SparkFlussConf.READ_OPTIMIZED
import org.apache.fluss.spark.catalog.{AbstractSparkTable,
SupportsFlussPartitionManagement}
import org.apache.fluss.spark.read.{FlussAppendScanBuilder,
FlussUpsertScanBuilder}
import org.apache.fluss.spark.write.{FlussAppendWriteBuilder,
FlussUpsertWriteBuilder}
@@ -28,7 +29,6 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class SparkTable(
@@ -54,12 +54,14 @@ class SparkTable(
if (tableInfo.getPrimaryKeys.isEmpty) {
new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
} else {
- if (!conf.getConf(SparkFlussConf.READ_OPTIMIZED, false)) {
- throw new UnsupportedOperationException(
- "For now, only data in snapshot can be read, without merging them
with changes. " +
- "If you can accept it, please set `spark.sql.fluss.readOptimized`
true, and execute query again.")
+ val newFlussConfig = if (conf.getConf(SparkFlussConf.READ_OPTIMIZED,
false)) {
+ val newFlussConfig_ = new FlussConfiguration(flussConfig)
+ newFlussConfig_.setBoolean(SparkFlussConf.READ_OPTIMIZED_OPTION.key(),
true)
+ newFlussConfig_
+ } else {
+ flussConfig
}
- new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
+ new FlussUpsertScanBuilder(tablePath, tableInfo, options, newFlussConfig)
}
}
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
index 52cd46652..c48a82dbd 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
@@ -17,30 +17,70 @@
package org.apache.fluss.spark.read
+import org.apache.fluss.client.table.scanner.{ScanRecord, SortMergeReader}
import org.apache.fluss.client.table.scanner.batch.BatchScanner
+import org.apache.fluss.client.table.scanner.log.LogScanner
import org.apache.fluss.config.Configuration
+import org.apache.fluss.memory.MemorySegment
import org.apache.fluss.metadata.{TableBucket, TablePath}
+import org.apache.fluss.record.LogRecord
+import org.apache.fluss.row.{encode, InternalRow, KeyValueRow}
+import org.apache.fluss.spark.SparkFlussConf
+import org.apache.fluss.spark.utils.LogChangesIterator
+import org.apache.fluss.utils.CloseableIterator
-import java.util
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util.Comparator
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
/**
* Partition reader that reads primary key table data.
*
- * For now, only data in snapshot can be read, without merging them with
changes.
+ * Reads both snapshot and log data, merging them using sort-merge algorithm.
For rows with the same
+ * primary key, log data takes precedence over snapshot data.
*/
class FlussUpsertPartitionReader(
tablePath: TablePath,
projection: Array[Int],
flussPartition: FlussUpsertInputPartition,
flussConfig: Configuration)
- extends FlussPartitionReader(tablePath, flussConfig) {
+ extends FlussPartitionReader(tablePath, flussConfig)
+ with Logging {
+ private val readOptimized =
flussConfig.get(SparkFlussConf.READ_OPTIMIZED_OPTION)
private val tableBucket: TableBucket = flussPartition.tableBucket
private val snapshotId: Long = flussPartition.snapshotId
+ private val logStartingOffset: Long = flussPartition.logStartingOffset
+ private val logStoppingOffset: Long = flussPartition.logStoppingOffset
+ private val logScanFinished = logStartingOffset >= logStoppingOffset ||
logStoppingOffset <= 0
+
+ private val (projectionWithPks, pkProjection) = {
+ val pkIndexes = tableInfo.getSchema.getPrimaryKeyIndexes
+ var i = 0
+ val _pkProjection = mutable.ArrayBuffer.empty[Int]
+ val extraProjections = mutable.ArrayBuffer.empty[Int]
+ extraProjections ++= projection
+ pkIndexes.foreach {
+ pkIndex =>
+ projection.find(p => p == pkIndex) match {
+ case Some(index) =>
+ _pkProjection += index
+ case _ =>
+ extraProjections += pkIndex
+ _pkProjection += projection.length + i
+ i += 1
+ }
+ }
+ (extraProjections.toArray, _pkProjection.toArray)
+ }
- // KV Snapshot Reader (if snapshot exists)
private var snapshotScanner: BatchScanner = _
- private var snapshotIterator:
util.Iterator[org.apache.fluss.row.InternalRow] = _
+ private var logScanner: LogScanner = _
+ private var mergedIterator: Iterator[InternalRow] = _
// initialize scanners
initialize()
@@ -50,39 +90,153 @@ class FlussUpsertPartitionReader(
return false
}
- if (snapshotIterator == null || !snapshotIterator.hasNext) {
- // Try to get next batch from snapshot scanner
- val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
- if (batch == null) {
- // No more data fetched.
- false
+ if (mergedIterator.hasNext) {
+ currentRow = convertToSparkRow(mergedIterator.next())
+ true
+ } else {
+ false
+ }
+ }
+
+ private def createSortMergeReader(): SortMergeReader = {
+ // Create key encoder for primary keys
+ val keyEncoder =
+ encode.KeyEncoder.ofPrimaryKeyEncoder(
+ rowType,
+ tableInfo.getPhysicalPrimaryKeys,
+ tableInfo.getTableConfig,
+ tableInfo.isDefaultBucketKey)
+
+ // Create comparators based on primary key
+ val comparator = new Comparator[InternalRow] {
+ override def compare(o1: InternalRow, o2: InternalRow): Int = {
+ val key1 = keyEncoder.encodeKey(o1)
+ val key2 = keyEncoder.encodeKey(o2)
+ MemorySegment.wrap(key1).compare(MemorySegment.wrap(key2), 0, 0,
key1.length)
+ }
+ }
+
+ def createLogChangesIterator(): LogChangesIterator = {
+ // Initialize the log scanner
+ logScanner =
table.newScan().project(projectionWithPks).createLogScanner()
+ if (tableBucket.getPartitionId == null) {
+ logScanner.subscribe(tableBucket.getBucket, logStartingOffset)
} else {
- snapshotIterator = batch
- if (snapshotIterator.hasNext) {
- currentRow = convertToSparkRow(snapshotIterator.next())
- true
- } else {
- // Poll a new batch
- next()
+ logScanner.subscribe(tableBucket.getPartitionId,
tableBucket.getBucket, logStartingOffset)
+ }
+
+ // Collect all log records until logStoppingOffset
+ val allLogRecords = mutable.ArrayBuffer[ScanRecord]()
+ var continue = true
+
+ while (continue) {
+ val records = logScanner.poll(POLL_TIMEOUT)
+ if (!records.isEmpty) {
+ val flatRecords = records.asScala
+ for (scanRecord <- flatRecords) {
+ // Maybe data with logStoppingOffset doesn't exist.
+ if (scanRecord.logOffset() < logStoppingOffset - 1) {
+ allLogRecords += scanRecord
+ } else if (scanRecord.logOffset() == logStoppingOffset - 1) {
+ allLogRecords += scanRecord
+ continue = false
+ } else {
+ continue = false // Stop if we reach the stopping offset
+ }
+ }
}
}
+
+ LogChangesIterator(allLogRecords.toArray, pkProjection, comparator)
+ }
+
+ def createSnapshotIterator(): CloseableIterator[LogRecord] = {
+ // Initialize the snapshot scanner
+ snapshotScanner =
+
table.newScan().project(projectionWithPks).createBatchScanner(tableBucket,
snapshotId)
+
+ // Convert snapshot iterator to LogRecord iterator for SortMergeReader
+ new CloseableIterator[LogRecord] {
+ private var currentBatch: java.util.Iterator[InternalRow] = _
+ private var hasMoreBatches = true
+
+ override def hasNext: Boolean = {
+ while ((currentBatch == null || !currentBatch.hasNext) &&
hasMoreBatches) {
+ val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
+ if (batch == null) {
+ hasMoreBatches = false
+ } else {
+ currentBatch = batch
+ }
+ }
+ currentBatch != null && currentBatch.hasNext
+ }
+
+ override def next(): LogRecord = {
+ // Convert InternalRow to LogRecord using GenericRecord
+ new ScanRecord(currentBatch.next())
+ }
+
+ override def close(): Unit = {
+ snapshotScanner.close()
+ }
+ }
+
+ }
+
+ // Get log iterator with proper filtering and sorting
+ val logIterator = if (logScanFinished || readOptimized) {
+ CloseableIterator.emptyIterator[KeyValueRow]()
} else {
- // Get data from current snapshot batch
- currentRow = convertToSparkRow(snapshotIterator.next())
- true
+ createLogChangesIterator()
}
+
+ val snapshotIterators = if (snapshotId == -1) {
+ null
+ } else {
+ createSnapshotIterator()
+ }
+
+ // Create the SortMergeReader
+ val sortMergeReader = new SortMergeReader(
+ projectionWithPks,
+ pkProjection,
+ snapshotIterators,
+ comparator,
+ logIterator
+ )
+ sortMergeReader
}
private def initialize(): Unit = {
- // Initialize Scanners
- if (snapshotId >= 0) {
- // Create batch scanner
- snapshotScanner =
- table.newScan().project(projection).createBatchScanner(tableBucket,
snapshotId)
+ val currentTs = System.currentTimeMillis()
+
+ val sortMergeReader = createSortMergeReader()
+
+ // Get the merged result iterator
+ val mergedResult = sortMergeReader.readBatch()
+
+ // If merged result is null, return an empty iterator
+ mergedIterator = if (mergedResult == null) {
+ Iterator.empty
+ } else {
+ mergedResult.asScala
}
+ val spend = (System.currentTimeMillis() - currentTs) / 1000
+ logInfo(s"Initialize FlussUpsertPartitionReader cost $spend(s)")
}
override def close0(): Unit = {
+ if (mergedIterator != null) {
+ mergedIterator match {
+ case closeable: AutoCloseable => closeable.close()
+ case _ => // Do nothing
+ }
+ }
+
+ if (logScanner != null) {
+ logScanner.close()
+ }
if (snapshotScanner != null) {
snapshotScanner.close()
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
new file mode 100644
index 000000000..2c5871c84
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.utils
+
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.client.utils.SingleElementHeadIterator
+import org.apache.fluss.record.ChangeType
+import org.apache.fluss.row.{InternalRow, KeyValueRow, ProjectedRow}
+import org.apache.fluss.utils.CloseableIterator
+
+import java.util.Comparator
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * An iterator that processes log records and merges changes for rows with the
same primary key.
+ *
+ * This iterator sorts log records by primary key and then by offset, groups
records with the same
+ * primary key together, and merges them according to change type semantics.
The merging logic
+ * follows these rules:
+ * - INSERT: replaces previous state
+ * - DELETE: removes the record
+ * - UPDATE_BEFORE: ignored during merge
+ * - UPDATE_AFTER: replaces previous state
+ *
+ * The iterator outputs [[KeyValueRow]] instances representing the final state
after merging all
+ * changes for each primary key.
+ */
+case class LogChangesIterator(
+ logRecords: Array[ScanRecord],
+ pkProjection: Array[Int],
+ comparator: Comparator[InternalRow]
+) extends CloseableIterator[KeyValueRow] {
+
+ // Sort the records by primary key and then by offset
+ private val sortedLogRecords = logRecords.sortWith {
+ case (record1, record2) =>
+ val keyComparison = comparator.compare(record1.getRow, record2.getRow)
+ if (keyComparison == 0) {
+ record1.logOffset() < record2.logOffset() // For same key, lower
offset comes first
+ } else {
+ keyComparison < 0
+ }
+ }
+
+ private var recordsIterator = SingleElementHeadIterator.addElementToHead(
+ sortedLogRecords.head,
+ CloseableIterator.wrap(sortedLogRecords.tail.toIterator.asJava))
+
+ private val projectRow1 = ProjectedRow.from(pkProjection)
+ private val projectRow2 = ProjectedRow.from(pkProjection)
+
+ private var currentScanRecord: ScanRecord = _
+
+ override def hasNext: Boolean = {
+ if (currentScanRecord != null) {
+ return true
+ }
+ if (!recordsIterator.hasNext) {
+ return false
+ }
+
+ val recordsWithSamePK = mutable.ArrayBuffer(recordsIterator.next())
+ val current = recordsWithSamePK.head
+
+ var continue = true
+ while (continue && recordsIterator.hasNext) {
+ val next = recordsIterator.next()
+ if (hasSamePrimaryKey(current, next)) {
+ recordsWithSamePK.append(next)
+ } else {
+ recordsIterator = SingleElementHeadIterator.addElementToHead(next,
recordsIterator)
+ continue = false
+ }
+ }
+
+ mergeLogRecordsWithSamePK(recordsWithSamePK.toArray) match {
+ case Some(record) =>
+ currentScanRecord = record
+ true
+ case None =>
+ hasNext
+ }
+ }
+
+ override def next(): KeyValueRow = {
+ assert(currentScanRecord != null)
+
+ val result = new KeyValueRow(
+ pkProjection,
+ currentScanRecord.getRow,
+ isDelete(currentScanRecord.getChangeType)
+ )
+ currentScanRecord = null
+ result
+ }
+
+ private def hasSamePrimaryKey(s1: ScanRecord, s2: ScanRecord): Boolean = {
+ comparator.compare(
+ projectRow1.replaceRow(s1.getRow),
+ projectRow2.replaceRow(s2.getRow)
+ ) == 0
+ }
+
+ private def mergeLogRecordsWithSamePK(logRecords: Array[ScanRecord]):
Option[ScanRecord] = {
+ var result: Option[ScanRecord] = Some(logRecords.head)
+ logRecords.tail.foreach {
+ record =>
+ record.getChangeType match {
+ case ChangeType.INSERT =>
+ result = Some(record)
+ case ChangeType.DELETE =>
+ result = None
+ case ChangeType.UPDATE_BEFORE =>
+ // Ignore
+ case ChangeType.UPDATE_AFTER =>
+ result = Some(record)
+ case _ =>
+ throw new IllegalArgumentException(s"Unknown change type:
${record.getChangeType}")
+ }
+ }
+ result
+ }
+
+ private def isDelete(changeType: ChangeType): Boolean = {
+ changeType == ChangeType.DELETE || changeType == ChangeType.UPDATE_BEFORE
+ }
+
+ override def close(): Unit = {}
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 4bdf5e84d..688ae872c 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -22,10 +22,11 @@ import org.apache.fluss.client.admin.Admin
import org.apache.fluss.client.table.Table
import org.apache.fluss.client.table.scanner.log.LogScanner
import org.apache.fluss.config.{ConfigOptions, Configuration}
-import org.apache.fluss.metadata.{DataLakeFormat, TableDescriptor, TablePath}
+import org.apache.fluss.metadata.{TableDescriptor, TablePath}
import org.apache.fluss.row.InternalRow
import org.apache.fluss.server.testutils.FlussClusterExtension
+import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
@@ -41,27 +42,25 @@ class FlussSparkTestBase extends QueryTest with
SharedSparkSession {
protected var conn: Connection = _
protected var admin: Admin = _
- val flussServer: FlussClusterExtension =
+ protected val flussServer: FlussClusterExtension =
FlussClusterExtension.builder
.setClusterConf(flussConf)
.setNumOfTabletServers(3)
.build
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(s"spark.sql.catalog.$DEFAULT_CATALOG",
classOf[SparkCatalog].getName)
+ .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers",
flussServer.getBootstrapServers)
+ .set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
+ }
+
override protected def beforeAll(): Unit = {
- super.beforeAll()
flussServer.start()
conn = ConnectionFactory.createConnection(flussServer.getClientConfig)
admin = conn.getAdmin
- spark.conf.set(s"spark.sql.catalog.$DEFAULT_CATALOG",
classOf[SparkCatalog].getName)
- spark.conf.set(
- s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers",
- flussServer.getBootstrapServers)
- spark.conf.set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
- // Enable read optimized by default temporarily.
- // TODO: remove this when https://github.com/apache/fluss/issues/2427 is
done.
- spark.conf.set("spark.sql.fluss.readOptimized", "true")
-
+ super.beforeAll()
sql(s"USE $DEFAULT_DATABASE")
}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala
similarity index 68%
rename from
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
rename to
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala
index 834cec7ad..5f9613cbc 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala
@@ -19,7 +19,7 @@ package org.apache.fluss.spark
import org.apache.spark.sql.Row
-class SparkReadTest extends FlussSparkTestBase {
+class SparkLogTableReadTest extends FlussSparkTestBase {
test("Spark Read: log table") {
withTable("t") {
@@ -80,48 +80,6 @@ class SparkReadTest extends FlussSparkTestBase {
}
}
- test("Spark Read: primary key table") {
- withTable("t") {
- sql(s"""
- |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT,
amount INT, address STRING)
- |TBLPROPERTIES("primary.key" = "orderId")
- |""".stripMargin)
-
- sql(s"""
- |INSERT INTO $DEFAULT_DATABASE.t VALUES
- |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
- |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
- |(1000L, 25L, 605, "addr5")
- |""".stripMargin)
-
- checkAnswer(
- sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
- Row(600L, 21L, 601, "addr1") ::
- Row(700L, 22L, 602, "addr2") ::
- Row(800L, 23L, 603, "addr3") ::
- Row(900L, 24L, 604, "addr4") ::
- Row(1000L, 25L, 605, "addr5") :: Nil
- )
-
- sql(s"""
- |INSERT INTO $DEFAULT_DATABASE.t VALUES
- |(700L, 220L, 602, "addr2"),
- |(900L, 240L, 604, "addr4"),
- |(1100L, 260L, 606, "addr6")
- |""".stripMargin)
-
- checkAnswer(
- sql(s"""
- |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
- |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
- Row(600L, 21L, "addr1") ::
- Row(700L, 220L, "addr2") ::
- Row(800L, 23L, "addr3") ::
- Nil
- )
- }
- }
-
test("Spark Read: partitioned log table") {
withTable("t") {
sql(
@@ -168,62 +126,6 @@ class SparkReadTest extends FlussSparkTestBase {
}
}
- test("Spark Read: partitioned primary key table") {
- withTable("t") {
- sql(s"""
- |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT,
amount INT, address STRING, dt STRING)
- |PARTITIONED BY (dt)
- |TBLPROPERTIES("primary.key" = "orderId,dt")
- |""".stripMargin)
-
- sql(s"""
- |INSERT INTO $DEFAULT_DATABASE.t VALUES
- |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602,
"addr2", "2026-01-01"),
- |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604,
"addr4", "2026-01-02"),
- |(1000L, 25L, 605, "addr5", "2026-01-03")
- |""".stripMargin)
- sql(s"""
- |INSERT INTO $DEFAULT_DATABASE.t VALUES
- |(700L, 220L, 602, "addr2_updated", "2026-01-01"),
- |(900L, 240L, 604, "addr4_updated", "2026-01-02"),
- |(1100L, 260L, 606, "addr6", "2026-01-03")
- |""".stripMargin)
-
- checkAnswer(
- sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
- Row(600L, 21L, 601, "addr1", "2026-01-01") ::
- Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
- Row(800L, 23L, 603, "addr3", "2026-01-02") ::
- Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
- Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
- Row(1100L, 260L, 606, "addr6", "2026-01-03") ::
- Nil
- )
-
- // Read with partition filter
- checkAnswer(
- sql(s"""
- |SELECT * FROM $DEFAULT_DATABASE.t
- |WHERE dt = '2026-01-01'
- |ORDER BY orderId""".stripMargin),
- Row(600L, 21L, 601, "addr1", "2026-01-01") ::
- Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
- Nil
- )
-
- // Read with multiple partition filters
- checkAnswer(
- sql(
- s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt IN ('2026-01-01',
'2026-01-02') ORDER BY orderId"),
- Row(600L, 21L, 601, "addr1", "2026-01-01") ::
- Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
- Row(800L, 23L, 603, "addr3", "2026-01-02") ::
- Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
- Nil
- )
- }
- }
-
test("Spark: all data types") {
withTable("t") {
sql(s"""
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
new file mode 100644
index 000000000..c1cb02236
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl,
OffsetsInitializer}
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.metadata.{TableBucket, TablePath}
+import org.apache.fluss.spark.read.FlussUpsertInputPartition
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+import scala.collection.JavaConverters._
+
+/** This test case is used to verify the correctness of primary key table
read. */
+class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
+
+ /**
+ * Do not set [[ConfigOptions.KV_SNAPSHOT_INTERVAL]] here, we want to
control the snapshot trigger
+ * by manually.
+ */
+ override def flussConf: Configuration = {
+ new Configuration()
+ }
+
+ test("Spark Read: primary key table") {
+ withTable("t") {
+ val tablePath = createTablePath("t")
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT,
amount INT, address STRING)
+ |TBLPROPERTIES("primary.key" = "orderId", "bucket.num" = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+ |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+ |(1000L, 25L, 605, "addr5")
+ |""".stripMargin)
+
+ var inputPartitions = genInputPartition(tablePath, null)
+ // Data is only stored in log.
+ assertThat(inputPartitions.exists(hasSnapshotData)).isEqualTo(false)
+ assertThat(inputPartitions.forall(hasLogChanges)).isEqualTo(true)
+ // Read data from log scanner.
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1") ::
+ Row(700L, 22L, 602, "addr2") ::
+ Row(800L, 23L, 603, "addr3") ::
+ Row(900L, 24L, 604, "addr4") ::
+ Row(1000L, 25L, 605, "addr5") :: Nil
+ )
+
+ // Trigger snapshot.
+ flussServer.triggerAndWaitSnapshot(tablePath)
+ inputPartitions = genInputPartition(tablePath, null)
+ assertThat(inputPartitions.forall(hasSnapshotData)).isEqualTo(true)
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1") ::
+ Row(700L, 22L, 602, "addr2") ::
+ Row(800L, 23L, 603, "addr3") ::
+ Row(900L, 24L, 604, "addr4") ::
+ Row(1000L, 25L, 605, "addr5") :: Nil
+ )
+
+ // Upsert.
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(700L, 220L, 602, "addr2"),
+ |(900L, 240L, 604, "addr4"),
+ |(1100L, 260L, 606, "addr6")
+ |""".stripMargin)
+
+ inputPartitions = genInputPartition(tablePath, null)
+ // Data is stored in both snapshot and log.
+ assertThat(inputPartitions.exists(hasSnapshotData)).isEqualTo(true)
+ assertThat(inputPartitions.exists(hasLogChanges)).isEqualTo(true)
+ checkAnswer(
+ sql(s"""
+ |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
+ |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
+ Row(600L, 21L, "addr1") ::
+ Row(700L, 220L, "addr2") ::
+ Row(800L, 23L, "addr3") ::
+ Nil
+ )
+ withSQLConf(SparkFlussConf.READ_OPTIMIZED.key -> "true") {
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1") ::
+ Row(700L, 22L, 602, "addr2") ::
+ Row(800L, 23L, 603, "addr3") ::
+ Row(900L, 24L, 604, "addr4") ::
+ Row(1000L, 25L, 605, "addr5") :: Nil
+ )
+ }
+
+ // Trigger snapshot.
+ flussServer.triggerAndWaitSnapshot(tablePath)
+ checkAnswer(
+ sql(s"""
+ |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
+ |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
+ Row(600L, 21L, "addr1") ::
+ Row(700L, 220L, "addr2") ::
+ Row(800L, 23L, "addr3") ::
+ Nil
+ )
+ }
+ }
+
+ test("Spark Read: partitioned primary key table") {
+ withTable("t") {
+ val tablePath = createTablePath("t")
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT,
amount INT, address STRING, dt STRING)
+ |PARTITIONED BY (dt)
+ |TBLPROPERTIES("primary.key" = "orderId,dt", "bucket.num" = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602,
"addr2", "2026-01-01"),
+ |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604,
"addr4", "2026-01-02"),
+ |(1000L, 25L, 605, "addr5", "2026-01-03")
+ |""".stripMargin)
+
+ var inputPartitions =
admin.listPartitionInfos(tablePath).get().asScala.flatMap {
+ p => genInputPartition(tablePath, p.getPartitionName)
+ }
+ // Data is only in log.
+ assertThat(inputPartitions.exists(hasSnapshotData)).isEqualTo(false)
+ assertThat(inputPartitions.forall(hasLogChanges)).isEqualTo(true)
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 22L, 602, "addr2", "2026-01-01") ::
+ Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+ Row(900L, 24L, 604, "addr4", "2026-01-02") ::
+ Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
+ Nil
+ )
+
+ // Trigger snapshot.
+ flussServer.triggerAndWaitSnapshot(tablePath)
+ var inputPartition0 = genInputPartition(tablePath, "2026-01-01").head
+ assertThat(hasSnapshotData(inputPartition0)).isEqualTo(true)
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 22L, 602, "addr2", "2026-01-01") ::
+ Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+ Row(900L, 24L, 604, "addr4", "2026-01-02") ::
+ Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
+ Nil
+ )
+
+ // Upsert.
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(700L, 220L, 602, "addr2_updated", "2026-01-01"),
+ |(900L, 240L, 604, "addr4_updated", "2026-01-02"),
+ |(1100L, 260L, 606, "addr6", "2026-01-03")
+ |""".stripMargin)
+
+ inputPartition0 = genInputPartition(tablePath, "2026-01-01").head
+ // Data(2026-01-01, bucketId=0) is stored in both snapshot and log.
+ assertThat(hasSnapshotData(inputPartition0)).isEqualTo(true)
+ assertThat(hasLogChanges(inputPartition0)).isEqualTo(true)
+ // Read with partition filter
+ checkAnswer(
+ sql(s"""
+ |SELECT * FROM $DEFAULT_DATABASE.t
+ |WHERE dt = '2026-01-01'
+ |ORDER BY orderId""".stripMargin),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+ Nil
+ )
+
+ // Trigger a bucket snapshot.
+ flussServer.triggerAndWaitSnapshot(inputPartition0.tableBucket)
+ checkAnswer(
+ sql(s"""
+ |SELECT * FROM $DEFAULT_DATABASE.t
+ |WHERE dt = '2026-01-01'
+ |ORDER BY orderId""".stripMargin),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+ Nil
+ )
+
+ // Trigger snapshot.
+ flussServer.triggerAndWaitSnapshot(tablePath)
+ inputPartitions =
admin.listPartitionInfos(tablePath).get().asScala.flatMap {
+ p => genInputPartition(tablePath, p.getPartitionName)
+ }
+ assertThat(inputPartitions.forall(hasSnapshotData)).isEqualTo(true)
+ // Read with multiple partition filters
+ checkAnswer(
+ sql(
+ s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt IN ('2026-01-01',
'2026-01-02') ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+ Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+ Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
+ Nil
+ )
+ }
+ }
+
+ private def genInputPartition(
+ tablePath: TablePath,
+ partitionName: String): Array[FlussUpsertInputPartition] = {
+ val kvSnapshots = if (partitionName == null) {
+ admin.getLatestKvSnapshots(tablePath).get()
+ } else {
+ admin.getLatestKvSnapshots(tablePath, partitionName).get()
+ }
+ val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+ val latestOffsetsInitializer = OffsetsInitializer.latest()
+ val tableId = kvSnapshots.getTableId
+ val partitionId = kvSnapshots.getPartitionId
+ val bucketIds = kvSnapshots.getBucketIds
+ val bucketIdToLogOffset =
+ latestOffsetsInitializer.getBucketOffsets(partitionName, bucketIds,
bucketOffsetsRetriever)
+ bucketIds.asScala.map {
+ bucketId =>
+ val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+ val snapshotId = kvSnapshots.getSnapshotId(bucketId).orElse(-1L)
+ val logStartingOffset = kvSnapshots.getLogOffset(bucketId).orElse(-2L)
+ val logEndingOffset = bucketIdToLogOffset.get(bucketId)
+
+ FlussUpsertInputPartition(tableBucket, snapshotId, logStartingOffset,
logEndingOffset)
+ }.toArray
+ }
+
+ private def hasLogChanges(inputPartition: FlussUpsertInputPartition):
Boolean = {
+ inputPartition.logStoppingOffset > 0 && inputPartition.logStartingOffset <
inputPartition.logStoppingOffset
+ }
+
+ private def hasSnapshotData(inputPartition: FlussUpsertInputPartition):
Boolean = {
+ inputPartition.snapshotId >= 0
+ }
+}