Yohahaha commented on code in PR #3317:
URL: https://github.com/apache/fluss/pull/3317#discussion_r3247361120
##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala:
##########
@@ -91,6 +91,86 @@ abstract class SparkLakePrimaryKeyTableReadTestBase extends
SparkLakeTableReadTe
}
}
+ test("Spark Lake Read: pk table fallback uses Fluss kv snapshot for log tail
merge") {
+ // Non-partitioned table
+ withTable("t_fb_hybrid") {
+ val tablePath = createTablePath("t_fb_hybrid")
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_fb_hybrid (id INT, name STRING,
score INT)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${PRIMARY_KEY.key()}' = 'id',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_fb_hybrid VALUES
+ |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95)
+ |""".stripMargin)
+
+ flussServer.triggerAndWaitSnapshot(tablePath)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_fb_hybrid VALUES
+ |(2, "bob_updated", 100), (4, "david", 88)
+ |""".stripMargin)
+
+ val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_fb_hybrid ORDER BY id")
+ val partitions = lakeUpsertInputPartitions(df)
+ assert(
+ partitions.exists(_.snapshotId >= 0),
+ s"Expected at least one fallback partition with snapshotId >= 0, got:
${partitions.mkString(", ")}")
Review Comment:
`fallback` is misleading for me here, snapshotId >= 0 means we avoid read
full history log records right?
##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePrimaryKeyTableReadTestBase.scala:
##########
@@ -91,6 +91,86 @@ abstract class SparkLakePrimaryKeyTableReadTestBase extends
SparkLakeTableReadTe
}
}
+ test("Spark Lake Read: pk table fallback uses Fluss kv snapshot for log tail
merge") {
+ // Non-partitioned table
+ withTable("t_fb_hybrid") {
+ val tablePath = createTablePath("t_fb_hybrid")
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_fb_hybrid (id INT, name STRING,
score INT)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${PRIMARY_KEY.key()}' = 'id',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_fb_hybrid VALUES
+ |(1, "alice", 90), (2, "bob", 85), (3, "charlie", 95)
+ |""".stripMargin)
+
+ flussServer.triggerAndWaitSnapshot(tablePath)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_fb_hybrid VALUES
+ |(2, "bob_updated", 100), (4, "david", 88)
+ |""".stripMargin)
+
+ val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_fb_hybrid ORDER BY id")
+ val partitions = lakeUpsertInputPartitions(df)
+ assert(
+ partitions.exists(_.snapshotId >= 0),
+ s"Expected at least one fallback partition with snapshotId >= 0, got:
${partitions.mkString(", ")}")
+ checkAnswer(
+ df,
+ Row(1, "alice", 90) :: Row(2, "bob_updated", 100) ::
+ Row(3, "charlie", 95) :: Row(4, "david", 88) :: Nil
+ )
+ }
+
+ // Partitioned table
+ withTable("t_fb_hybrid_partitioned") {
+ val tablePath = createTablePath("t_fb_hybrid_partitioned")
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_fb_hybrid_partitioned (id INT,
name STRING, score INT, dt STRING)
+ | PARTITIONED BY (dt)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${PRIMARY_KEY.key()}' = 'id,dt',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_fb_hybrid_partitioned VALUES
+ |(1, "alice", 90, "2026-01-01"),
+ |(2, "bob", 85, "2026-01-01"),
+ |(3, "charlie", 95, "2026-01-02")
+ |""".stripMargin)
+
+ flussServer.triggerAndWaitSnapshot(tablePath)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_fb_hybrid_partitioned VALUES
+ |(2, "bob_updated", 100, "2026-01-01"),
+ |(4, "david", 88, "2026-01-02")
+ |""".stripMargin)
+
+ val df = sql(s"SELECT * FROM $DEFAULT_DATABASE.t_fb_hybrid_partitioned
ORDER BY id")
+ val partitions = lakeUpsertInputPartitions(df)
+ assert(
+ partitions.exists(_.snapshotId >= 0),
+ s"Expected at least one fallback partition with snapshotId >= 0, got:
${partitions.mkString(", ")}")
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]