This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4812fd2b21 [spark] Fallback to spark except query if increment query
with rescale bucket (#4989)
4812fd2b21 is described below
commit 4812fd2b21db12bc502315aa483087473402ecb5
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jan 23 19:21:14 2025 +0800
[spark] Fallback to spark except query if increment query with rescale
bucket (#4989)
---
.../snapshot/IncrementalTagStartingScanner.java | 4 +-
.../table/source/snapshot/TimeTravelUtil.java | 45 +++++++++++---
.../apache/paimon/flink/BatchFileStoreITCase.java | 3 +-
.../plans/logical/PaimonTableValuedFunctions.scala | 70 ++++++++++++++++++++--
.../spark/sql/TableValuedFunctionsTest.scala | 56 +++++++++++++++++
5 files changed, 160 insertions(+), 18 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index 835b7595a3..388b36fb28 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -57,8 +57,8 @@ public class IncrementalTagStartingScanner extends
AbstractStartingScanner {
snapshotManager.fileIO(),
snapshotManager.tablePath(),
snapshotManager.branch()),
- start.schemaId(),
- end.schemaId());
+ start,
+ end);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 4a0f4290df..d6231721de 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -128,15 +128,18 @@ public class TimeTravelUtil {
}
public static void checkRescaleBucketForIncrementalTagQuery(
- SchemaManager schemaManager, long schemaId1, long schemaId2) {
- if (schemaId1 != schemaId2) {
- int bucketNumber1 = bucketNumber(schemaManager, schemaId1);
- int bucketNumber2 = bucketNumber(schemaManager, schemaId2);
- checkArgument(
- bucketNumber1 == bucketNumber2,
- "The bucket number of two tags are different (%s, %s),
which is not supported in incremental tag query.",
- bucketNumber1,
- bucketNumber2);
+ SchemaManager schemaManager, Snapshot start, Snapshot end) {
+ if (start.schemaId() != end.schemaId()) {
+ int startBucketNumber = bucketNumber(schemaManager,
start.schemaId());
+ int endBucketNumber = bucketNumber(schemaManager, end.schemaId());
+ if (startBucketNumber != endBucketNumber) {
+ throw new InconsistentTagBucketException(
+ start.id(),
+ end.id(),
+ String.format(
+ "The bucket number of two tags are different
(%s, %s), which is not supported in incremental tag query.",
+ startBucketNumber, endBucketNumber));
+ }
}
}
@@ -144,4 +147,28 @@ public class TimeTravelUtil {
TableSchema schema = schemaManager.schema(schemaId);
return CoreOptions.fromMap(schema.options()).bucket();
}
+
+ /**
+ * Exception thrown when the bucket number of two tags are different in
incremental tag query.
+ */
+ public static class InconsistentTagBucketException extends
RuntimeException {
+
+ private final long startSnapshotId;
+ private final long endSnapshotId;
+
+ public InconsistentTagBucketException(
+ long startSnapshotId, long endSnapshotId, String message) {
+ super(message);
+ this.startSnapshotId = startSnapshotId;
+ this.endSnapshotId = endSnapshotId;
+ }
+
+ public long startSnapshotId() {
+ return startSnapshotId;
+ }
+
+ public long endSnapshotId() {
+ return endSnapshotId;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index d9a5cab1d4..486bfcb69b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotNotExistException;
@@ -676,7 +677,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s)
*/", option))
.satisfies(
anyCauseMatches(
- IllegalArgumentException.class,
+
TimeTravelUtil.InconsistentTagBucketException.class,
"The bucket number of two tags are
different (1, 2), which is not supported in incremental tag query."));
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
index 00759f663d..7e72abc4c9 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
@@ -19,15 +19,19 @@
package org.apache.paimon.spark.catalyst.plans.logical
import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.SparkTable
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._
+import org.apache.paimon.table.{DataTable, FileStoreTable}
+import
org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException
+import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistryBase
import
org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry.TableFunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -83,11 +87,65 @@ object PaimonTableValuedFunctions {
val sparkTable = sparkCatalog.loadTable(ident)
val options = tvf.parseArgs(args.tail)
- DataSourceV2Relation.create(
- sparkTable,
- Some(sparkCatalog),
- Some(ident),
- new CaseInsensitiveStringMap(options.asJava))
+ usingSparkIncrementQuery(tvf, sparkTable, options) match {
+ case Some(snapshotIdPair: (Long, Long)) =>
+ sparkIncrementQuery(spark, sparkTable, sparkCatalog, ident, options,
snapshotIdPair)
+ case _ =>
+ DataSourceV2Relation.create(
+ sparkTable,
+ Some(sparkCatalog),
+ Some(ident),
+ new CaseInsensitiveStringMap(options.asJava))
+ }
+ }
+
+ private def usingSparkIncrementQuery(
+ tvf: PaimonTableValueFunction,
+ sparkTable: Table,
+ options: Map[String, String]): Option[(Long, Long)] = {
+ tvf.fnName match {
+ case INCREMENTAL_QUERY | INCREMENTAL_TO_AUTO_TAG =>
+ sparkTable match {
+ case SparkTable(fileStoreTable: DataTable) =>
+ try {
+
fileStoreTable.copy(options.asJava).asInstanceOf[DataTable].newScan().plan()
+ None
+ } catch {
+ case e: InconsistentTagBucketException =>
+ Some((e.startSnapshotId, e.endSnapshotId))
+ }
+ case _ => None
+ }
+ case _ => None
+ }
+ }
+
+ private def sparkIncrementQuery(
+ spark: SparkSession,
+ sparkTable: Table,
+ sparkCatalog: TableCatalog,
+ ident: Identifier,
+ options: Map[String, String],
+ snapshotIdPair: (Long, Long)): LogicalPlan = {
+ val filteredOptions =
+ options - CoreOptions.INCREMENTAL_BETWEEN.key -
CoreOptions.INCREMENTAL_TO_AUTO_TAG.key
+
+ def datasetOfSnapshot(snapshotId: Long) = {
+ val updatedOptions = filteredOptions + (CoreOptions.SCAN_VERSION.key()
-> snapshotId.toString)
+ createDataset(
+ spark,
+ DataSourceV2Relation.create(
+ sparkTable,
+ Some(sparkCatalog),
+ Some(ident),
+ new CaseInsensitiveStringMap(updatedOptions.asJava)
+ ))
+ }
+
+ datasetOfSnapshot(snapshotIdPair._2)
+ .except(datasetOfSnapshot(snapshotIdPair._1))
+ .queryExecution
+ .logical
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 0aa1829eee..addf846100 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -189,6 +189,62 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
}
}
+ test("Table Valued Functions: incremental query with inconsistent tag
bucket") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (a INT, b INT) USING paimon
+ |TBLPROPERTIES ('primary-key'='a', 'bucket' = '1')
+ |""".stripMargin)
+
+ val table = loadTable("t")
+
+ sql("INSERT INTO t VALUES (1, 11), (2, 22)")
+ table.createTag("2024-01-01", 1)
+
+ sql("ALTER TABLE t SET TBLPROPERTIES ('bucket' = '2')")
+ sql("INSERT OVERWRITE t SELECT * FROM t")
+
+ sql("INSERT INTO t VALUES (3, 33)")
+ table.createTag("2024-01-03", 3)
+
+ sql("DELETE FROM t WHERE a = 1")
+ table.createTag("2024-01-04", 4)
+
+ sql("UPDATE t SET b = 222 WHERE a = 2")
+ table.createTag("2024-01-05", 5)
+
+ checkAnswer(
+ sql(
+ "SELECT * FROM paimon_incremental_query('t', '2024-01-01',
'2024-01-03') ORDER BY a, b"),
+ Seq(Row(3, 33)))
+
+ checkAnswer(
+ sql("SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-01-03')
ORDER BY a, b"),
+ Seq(Row(3, 33)))
+
+ checkAnswer(
+ sql(
+ "SELECT * FROM paimon_incremental_query('t', '2024-01-01',
'2024-01-04') ORDER BY a, b"),
+ Seq(Row(3, 33)))
+
+ checkAnswer(
+ sql(
+ "SELECT * FROM paimon_incremental_query('t', '2024-01-01',
'2024-01-05') ORDER BY a, b"),
+ Seq(Row(2, 222), Row(3, 33)))
+
+ checkAnswer(
+ sql(
+ "SELECT * FROM paimon_incremental_query('`t$audit_log`',
'2024-01-01', '2024-01-04') ORDER BY a, b"),
+ Seq(Row("-D", 1, 11), Row("+I", 3, 33)))
+
+ checkAnswer(
+ sql(
+ "SELECT * FROM paimon_incremental_query('`t$audit_log`',
'2024-01-01', '2024-01-05') ORDER BY a, b"),
+ Seq(Row("-D", 1, 11), Row("+U", 2, 222), Row("+I", 3, 33))
+ )
+ }
+ }
+
private def incrementalDF(tableIdent: String, start: Int, end: Int):
DataFrame = {
spark.read
.format("paimon")