This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4812fd2b21 [spark] Fallback to spark except query if increment query 
with rescale bucket (#4989)
4812fd2b21 is described below

commit 4812fd2b21db12bc502315aa483087473402ecb5
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jan 23 19:21:14 2025 +0800

    [spark] Fallback to spark except query if increment query with rescale 
bucket (#4989)
---
 .../snapshot/IncrementalTagStartingScanner.java    |  4 +-
 .../table/source/snapshot/TimeTravelUtil.java      | 45 +++++++++++---
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  3 +-
 .../plans/logical/PaimonTableValuedFunctions.scala | 70 ++++++++++++++++++++--
 .../spark/sql/TableValuedFunctionsTest.scala       | 56 +++++++++++++++++
 5 files changed, 160 insertions(+), 18 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index 835b7595a3..388b36fb28 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -57,8 +57,8 @@ public class IncrementalTagStartingScanner extends 
AbstractStartingScanner {
                         snapshotManager.fileIO(),
                         snapshotManager.tablePath(),
                         snapshotManager.branch()),
-                start.schemaId(),
-                end.schemaId());
+                start,
+                end);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 4a0f4290df..d6231721de 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -128,15 +128,18 @@ public class TimeTravelUtil {
     }
 
     public static void checkRescaleBucketForIncrementalTagQuery(
-            SchemaManager schemaManager, long schemaId1, long schemaId2) {
-        if (schemaId1 != schemaId2) {
-            int bucketNumber1 = bucketNumber(schemaManager, schemaId1);
-            int bucketNumber2 = bucketNumber(schemaManager, schemaId2);
-            checkArgument(
-                    bucketNumber1 == bucketNumber2,
-                    "The bucket number of two tags are different (%s, %s), 
which is not supported in incremental tag query.",
-                    bucketNumber1,
-                    bucketNumber2);
+            SchemaManager schemaManager, Snapshot start, Snapshot end) {
+        if (start.schemaId() != end.schemaId()) {
+            int startBucketNumber = bucketNumber(schemaManager, 
start.schemaId());
+            int endBucketNumber = bucketNumber(schemaManager, end.schemaId());
+            if (startBucketNumber != endBucketNumber) {
+                throw new InconsistentTagBucketException(
+                        start.id(),
+                        end.id(),
+                        String.format(
+                                "The bucket number of two tags are different 
(%s, %s), which is not supported in incremental tag query.",
+                                startBucketNumber, endBucketNumber));
+            }
         }
     }
 
@@ -144,4 +147,28 @@ public class TimeTravelUtil {
         TableSchema schema = schemaManager.schema(schemaId);
         return CoreOptions.fromMap(schema.options()).bucket();
     }
+
+    /**
+     * Exception thrown when the bucket number of two tags are different in 
incremental tag query.
+     */
+    public static class InconsistentTagBucketException extends 
RuntimeException {
+
+        private final long startSnapshotId;
+        private final long endSnapshotId;
+
+        public InconsistentTagBucketException(
+                long startSnapshotId, long endSnapshotId, String message) {
+            super(message);
+            this.startSnapshotId = startSnapshotId;
+            this.endSnapshotId = endSnapshotId;
+        }
+
+        public long startSnapshotId() {
+            return startSnapshotId;
+        }
+
+        public long endSnapshotId() {
+            return endSnapshotId;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index d9a5cab1d4..486bfcb69b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.SnapshotNotExistException;
@@ -676,7 +677,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
             assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s) 
*/", option))
                     .satisfies(
                             anyCauseMatches(
-                                    IllegalArgumentException.class,
+                                    
TimeTravelUtil.InconsistentTagBucketException.class,
                                     "The bucket number of two tags are 
different (1, 2), which is not supported in incremental tag query."));
         }
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
index 00759f663d..7e72abc4c9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
@@ -19,15 +19,19 @@
 package org.apache.paimon.spark.catalyst.plans.logical
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.SparkTable
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._
+import org.apache.paimon.table.{DataTable, FileStoreTable}
+import 
org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException
 
+import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistryBase
 import 
org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry.TableFunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
-import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -83,11 +87,65 @@ object PaimonTableValuedFunctions {
     val sparkTable = sparkCatalog.loadTable(ident)
     val options = tvf.parseArgs(args.tail)
 
-    DataSourceV2Relation.create(
-      sparkTable,
-      Some(sparkCatalog),
-      Some(ident),
-      new CaseInsensitiveStringMap(options.asJava))
+    usingSparkIncrementQuery(tvf, sparkTable, options) match {
+      case Some(snapshotIdPair: (Long, Long)) =>
+        sparkIncrementQuery(spark, sparkTable, sparkCatalog, ident, options, 
snapshotIdPair)
+      case _ =>
+        DataSourceV2Relation.create(
+          sparkTable,
+          Some(sparkCatalog),
+          Some(ident),
+          new CaseInsensitiveStringMap(options.asJava))
+    }
+  }
+
+  private def usingSparkIncrementQuery(
+      tvf: PaimonTableValueFunction,
+      sparkTable: Table,
+      options: Map[String, String]): Option[(Long, Long)] = {
+    tvf.fnName match {
+      case INCREMENTAL_QUERY | INCREMENTAL_TO_AUTO_TAG =>
+        sparkTable match {
+          case SparkTable(fileStoreTable: DataTable) =>
+            try {
+              
fileStoreTable.copy(options.asJava).asInstanceOf[DataTable].newScan().plan()
+              None
+            } catch {
+              case e: InconsistentTagBucketException =>
+                Some((e.startSnapshotId, e.endSnapshotId))
+            }
+          case _ => None
+        }
+      case _ => None
+    }
+  }
+
+  private def sparkIncrementQuery(
+      spark: SparkSession,
+      sparkTable: Table,
+      sparkCatalog: TableCatalog,
+      ident: Identifier,
+      options: Map[String, String],
+      snapshotIdPair: (Long, Long)): LogicalPlan = {
+    val filteredOptions =
+      options - CoreOptions.INCREMENTAL_BETWEEN.key - 
CoreOptions.INCREMENTAL_TO_AUTO_TAG.key
+
+    def datasetOfSnapshot(snapshotId: Long) = {
+      val updatedOptions = filteredOptions + (CoreOptions.SCAN_VERSION.key() 
-> snapshotId.toString)
+      createDataset(
+        spark,
+        DataSourceV2Relation.create(
+          sparkTable,
+          Some(sparkCatalog),
+          Some(ident),
+          new CaseInsensitiveStringMap(updatedOptions.asJava)
+        ))
+    }
+
+    datasetOfSnapshot(snapshotIdPair._2)
+      .except(datasetOfSnapshot(snapshotIdPair._1))
+      .queryExecution
+      .logical
   }
 }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 0aa1829eee..addf846100 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -189,6 +189,62 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
     }
   }
 
+  test("Table Valued Functions: incremental query with inconsistent tag 
bucket") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (a INT, b INT) USING paimon
+            |TBLPROPERTIES ('primary-key'='a', 'bucket' = '1')
+            |""".stripMargin)
+
+      val table = loadTable("t")
+
+      sql("INSERT INTO t VALUES (1, 11), (2, 22)")
+      table.createTag("2024-01-01", 1)
+
+      sql("ALTER TABLE t SET TBLPROPERTIES ('bucket' = '2')")
+      sql("INSERT OVERWRITE t SELECT * FROM t")
+
+      sql("INSERT INTO t VALUES (3, 33)")
+      table.createTag("2024-01-03", 3)
+
+      sql("DELETE FROM t WHERE a = 1")
+      table.createTag("2024-01-04", 4)
+
+      sql("UPDATE t SET b = 222 WHERE a = 2")
+      table.createTag("2024-01-05", 5)
+
+      checkAnswer(
+        sql(
+          "SELECT * FROM paimon_incremental_query('t', '2024-01-01', 
'2024-01-03') ORDER BY a, b"),
+        Seq(Row(3, 33)))
+
+      checkAnswer(
+        sql("SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-01-03') 
ORDER BY a, b"),
+        Seq(Row(3, 33)))
+
+      checkAnswer(
+        sql(
+          "SELECT * FROM paimon_incremental_query('t', '2024-01-01', 
'2024-01-04') ORDER BY a, b"),
+        Seq(Row(3, 33)))
+
+      checkAnswer(
+        sql(
+          "SELECT * FROM paimon_incremental_query('t', '2024-01-01', 
'2024-01-05') ORDER BY a, b"),
+        Seq(Row(2, 222), Row(3, 33)))
+
+      checkAnswer(
+        sql(
+          "SELECT * FROM paimon_incremental_query('`t$audit_log`', 
'2024-01-01', '2024-01-04') ORDER BY a, b"),
+        Seq(Row("-D", 1, 11), Row("+I", 3, 33)))
+
+      checkAnswer(
+        sql(
+          "SELECT * FROM paimon_incremental_query('`t$audit_log`', 
'2024-01-01', '2024-01-05') ORDER BY a, b"),
+        Seq(Row("-D", 1, 11), Row("+U", 2, 222), Row("+I", 3, 33))
+      )
+    }
+  }
+
   private def incrementalDF(tableIdent: String, start: Int, end: Int): 
DataFrame = {
     spark.read
       .format("paimon")

Reply via email to