This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fcdbbefaf [spark] Introduce paimon_incremental_between_timestamp and 
paimon_incremental_to_auto_tag tvf (#4855)
5fcdbbefaf is described below

commit 5fcdbbefafa330d5e15bb9d6fa9213b30baab764
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jan 7 22:51:26 2025 +0800

    [spark] Introduce paimon_incremental_between_timestamp and 
paimon_incremental_to_auto_tag tvf (#4855)
---
 docs/content/spark/sql-query.md                    |  10 +-
 .../shortcodes/generated/core_configuration.html   |   8 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   9 +-
 paimon-docs/README.md                              |   2 +-
 .../plans/logical/PaimonTableValuedFunctions.scala |  65 ++++++++++--
 .../apache/paimon/spark/PaimonHiveTestBase.scala   |   5 +
 .../apache/paimon/spark/PaimonSparkTestBase.scala  |   4 +-
 .../spark/sql/TableValuedFunctionsTest.scala       | 112 +++++++++++++++++++++
 8 files changed, 193 insertions(+), 22 deletions(-)

diff --git a/docs/content/spark/sql-query.md b/docs/content/spark/sql-query.md
index e118b8418b..c97b6d3341 100644
--- a/docs/content/spark/sql-query.md
+++ b/docs/content/spark/sql-query.md
@@ -77,11 +77,17 @@ You can also force specifying 
`'incremental-between-scan-mode'`.
 
 Paimon supports that use Spark SQL to do the incremental query that 
implemented by Spark Table Valued Function.
 
-you can use `paimon_incremental_query` in query to extract the incremental 
data:
-
 ```sql
 -- read the incremental data between snapshot id 12 and snapshot id 20.
 SELECT * FROM paimon_incremental_query('tableName', 12, 20);
+
+-- read the incremental data between ts 1692169900000 and ts 1692169900000.
+SELECT * FROM paimon_incremental_between_timestamp('tableName', 
'1692169000000', '1692169900000');
+
+-- read the incremental data to tag '2024-12-04'.
+-- Paimon will find an earlier tag and return changes between them.
+-- If the tag doesn't exist or the earlier tag doesn't exist, return empty.
+SELECT * FROM paimon_incremental_to_auto_tag('tableName', '2024-12-04');
 ```
 
 In Batch SQL, the `DELETE` records are not allowed to be returned, so records 
of `-D` will be dropped.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 7fdb43b8b0..d135b6060a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -396,25 +396,25 @@ under the License.
             <td><h5>incremental-between</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Read incremental changes between start snapshot (exclusive) 
and end snapshot, for example, '5,10' means changes between snapshot 5 and 
snapshot 10.</td>
+            <td>Read incremental changes between start snapshot (exclusive) 
and end snapshot (inclusive), for example, '5,10' means changes between 
snapshot 5 and snapshot 10.</td>
         </tr>
         <tr>
             <td><h5>incremental-between-scan-mode</h5></td>
             <td style="word-wrap: break-word;">auto</td>
             <td><p>Enum</p></td>
-            <td>Scan kind when Read incremental changes between start snapshot 
(exclusive) and end snapshot. <br /><br />Possible values:<ul><li>"auto": Scan 
changelog files for the table which produces changelog files. Otherwise, scan 
newly changed files.</li><li>"delta": Scan newly changed files between 
snapshots.</li><li>"changelog": Scan changelog files between 
snapshots.</li></ul></td>
+            <td>Scan kind when Read incremental changes between start snapshot 
(exclusive) and end snapshot (inclusive). <br /><br />Possible 
values:<ul><li>"auto": Scan changelog files for the table which produces 
changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan 
newly changed files between snapshots.</li><li>"changelog": Scan changelog 
files between snapshots.</li></ul></td>
         </tr>
         <tr>
             <td><h5>incremental-between-timestamp</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Read incremental changes between start timestamp (exclusive) 
and end timestamp, for example, 't1,t2' means changes between timestamp t1 and 
timestamp t2.</td>
+            <td>Read incremental changes between start timestamp (exclusive) 
and end timestamp (inclusive), for example, 't1,t2' means changes between 
timestamp t1 and timestamp t2.</td>
         </tr>
         <tr>
             <td><h5>incremental-to-auto-tag</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Used to specify the auto-created tag to reading incremental 
changes.</td>
+            <td>Used to specify the end tag (inclusive), and Paimon will find 
an earlier tag and return changes between them. If the tag doesn't exist or the 
earlier tag doesn't exist, return empty. </td>
         </tr>
         <tr>
             <td><h5>local-merge-buffer-size</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 3d6b0b5490..acf082177b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1089,7 +1089,7 @@ public class CoreOptions implements Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Read incremental changes between start snapshot 
(exclusive) and end snapshot, "
+                            "Read incremental changes between start snapshot 
(exclusive) and end snapshot (inclusive), "
                                     + "for example, '5,10' means changes 
between snapshot 5 and snapshot 10.");
 
     public static final ConfigOption<IncrementalBetweenScanMode> 
INCREMENTAL_BETWEEN_SCAN_MODE =
@@ -1097,14 +1097,14 @@ public class CoreOptions implements Serializable {
                     .enumType(IncrementalBetweenScanMode.class)
                     .defaultValue(IncrementalBetweenScanMode.AUTO)
                     .withDescription(
-                            "Scan kind when Read incremental changes between 
start snapshot (exclusive) and end snapshot. ");
+                            "Scan kind when Read incremental changes between 
start snapshot (exclusive) and end snapshot (inclusive). ");
 
     public static final ConfigOption<String> INCREMENTAL_BETWEEN_TIMESTAMP =
             key("incremental-between-timestamp")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Read incremental changes between start timestamp 
(exclusive) and end timestamp, "
+                            "Read incremental changes between start timestamp 
(exclusive) and end timestamp (inclusive), "
                                     + "for example, 't1,t2' means changes 
between timestamp t1 and timestamp t2.");
 
     public static final ConfigOption<String> INCREMENTAL_TO_AUTO_TAG =
@@ -1112,7 +1112,8 @@ public class CoreOptions implements Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Used to specify the auto-created tag to reading 
incremental changes.");
+                            "Used to specify the end tag (inclusive), and 
Paimon will find an earlier tag and return changes between them. "
+                                    + "If the tag doesn't exist or the earlier 
tag doesn't exist, return empty. ");
 
     public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE 
=
             key("end-input.check-partition-expire")
diff --git a/paimon-docs/README.md b/paimon-docs/README.md
index 76d922c882..cb1d475869 100644
--- a/paimon-docs/README.md
+++ b/paimon-docs/README.md
@@ -28,7 +28,7 @@ The `@ConfigGroups` annotation can be used to generate 
multiple files from a sin
 
 To integrate an `*Options` class from another package, add another 
module-package argument pair to `ConfigOptionsDocGenerator#LOCATIONS`.
 
-The files can be generated by running `mvn clean install -DskipTests` and `mvn 
package -Pgenerate-docs -pl paimon-docs -nsu -DskipTests`, and can be 
integrated into the documentation using `{{ include generated/<file-name> >}}`.
+The files can be generated by running `mvn package -Pgenerate-docs -pl 
paimon-docs -nsu -DskipTests -am`, and can be integrated into the documentation 
using `{{ include generated/<file-name> >}}`.
 
 **NOTE:** You need to make sure that the changed jar has been installed in the 
local maven repository.
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
index 6edbf533cb..00759f663d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.catalyst.plans.logical
 
 import org.apache.paimon.CoreOptions
+import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.FunctionIdentifier
@@ -35,8 +36,11 @@ import scala.collection.JavaConverters._
 object PaimonTableValuedFunctions {
 
   val INCREMENTAL_QUERY = "paimon_incremental_query"
+  val INCREMENTAL_BETWEEN_TIMESTAMP = "paimon_incremental_between_timestamp"
+  val INCREMENTAL_TO_AUTO_TAG = "paimon_incremental_to_auto_tag"
 
-  val supportedFnNames: Seq[String] = Seq(INCREMENTAL_QUERY)
+  val supportedFnNames: Seq[String] =
+    Seq(INCREMENTAL_QUERY, INCREMENTAL_BETWEEN_TIMESTAMP, 
INCREMENTAL_TO_AUTO_TAG)
 
   private type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, 
TableFunctionBuilder)
 
@@ -44,6 +48,10 @@ object PaimonTableValuedFunctions {
     val (info, builder) = fnName match {
       case INCREMENTAL_QUERY =>
         FunctionRegistryBase.build[IncrementalQuery](fnName, since = None)
+      case INCREMENTAL_BETWEEN_TIMESTAMP =>
+        FunctionRegistryBase.build[IncrementalBetweenTimestamp](fnName, since 
= None)
+      case INCREMENTAL_TO_AUTO_TAG =>
+        FunctionRegistryBase.build[IncrementalToAutoTag](fnName, since = None)
       case _ =>
         throw new Exception(s"Function $fnName isn't a supported table valued 
function.")
     }
@@ -58,12 +66,23 @@ object PaimonTableValuedFunctions {
 
     val sessionState = spark.sessionState
     val catalogManager = sessionState.catalogManager
-    val sparkCatalog = catalogManager.currentCatalog.asInstanceOf[TableCatalog]
-    val tableId = 
sessionState.sqlParser.parseTableIdentifier(args.head.eval().toString)
-    val namespace = 
tableId.database.map(Array(_)).getOrElse(catalogManager.currentNamespace)
-    val ident = Identifier.of(namespace, tableId.table)
+
+    val identifier = args.head.eval().toString
+    val (catalogName, dbName, tableName) = {
+      sessionState.sqlParser.parseMultipartIdentifier(identifier) match {
+        case Seq(table) =>
+          (catalogManager.currentCatalog.name(), 
catalogManager.currentNamespace.head, table)
+        case Seq(db, table) => (catalogManager.currentCatalog.name(), db, 
table)
+        case Seq(catalog, db, table) => (catalog, db, table)
+        case _ => throw new RuntimeException(s"Invalid table identifier: 
$identifier")
+      }
+    }
+
+    val sparkCatalog = 
catalogManager.catalog(catalogName).asInstanceOf[TableCatalog]
+    val ident: Identifier = Identifier.of(Array(dbName), tableName)
     val sparkTable = sparkCatalog.loadTable(ident)
     val options = tvf.parseArgs(args.tail)
+
     DataSourceV2Relation.create(
       sparkTable,
       Some(sparkCatalog),
@@ -87,20 +106,46 @@ abstract class PaimonTableValueFunction(val fnName: 
String) extends LeafNode {
   val args: Seq[Expression]
 
   def parseArgs(args: Seq[Expression]): Map[String, String]
-
 }
 
-/** Plan for the "paimon_incremental_query" function */
+/** Plan for the [[INCREMENTAL_QUERY]] function */
 case class IncrementalQuery(override val args: Seq[Expression])
-  extends 
PaimonTableValueFunction(PaimonTableValuedFunctions.INCREMENTAL_QUERY) {
+  extends PaimonTableValueFunction(INCREMENTAL_QUERY) {
 
   override def parseArgs(args: Seq[Expression]): Map[String, String] = {
     assert(
-      args.size >= 1 && args.size <= 2,
-      "paimon_incremental_query needs two parameters: startSnapshotId, and 
endSnapshotId.")
+      args.size == 2,
+      s"$INCREMENTAL_QUERY needs two parameters: startSnapshotId, and 
endSnapshotId.")
 
     val start = args.head.eval().toString
     val end = args.last.eval().toString
     Map(CoreOptions.INCREMENTAL_BETWEEN.key -> s"$start,$end")
   }
 }
+
+/** Plan for the [[INCREMENTAL_BETWEEN_TIMESTAMP]] function */
+case class IncrementalBetweenTimestamp(override val args: Seq[Expression])
+  extends PaimonTableValueFunction(INCREMENTAL_BETWEEN_TIMESTAMP) {
+
+  override def parseArgs(args: Seq[Expression]): Map[String, String] = {
+    assert(
+      args.size == 2,
+      s"$INCREMENTAL_BETWEEN_TIMESTAMP needs two parameters: startTimestamp, 
and endTimestamp.")
+
+    val start = args.head.eval().toString
+    val end = args.last.eval().toString
+    Map(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key -> s"$start,$end")
+  }
+}
+
+/** Plan for the [[INCREMENTAL_TO_AUTO_TAG]] function */
+case class IncrementalToAutoTag(override val args: Seq[Expression])
+  extends PaimonTableValueFunction(INCREMENTAL_TO_AUTO_TAG) {
+
+  override def parseArgs(args: Seq[Expression]): Map[String, String] = {
+    assert(args.size == 1, s"$INCREMENTAL_TO_AUTO_TAG needs one parameter: 
endTagName.")
+
+    val endTagName = args.head.eval().toString
+    Map(CoreOptions.INCREMENTAL_TO_AUTO_TAG.key -> endTagName)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
index 6d2ffea04d..d4d888f2c1 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.hive.TestHiveMetastore
+import org.apache.paimon.table.FileStoreTable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkConf
@@ -78,6 +79,10 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
     spark.sql(s"USE $sparkCatalogName")
     spark.sql(s"USE $hiveDbName")
   }
+
+  override def loadTable(tableName: String): FileStoreTable = {
+    loadTable(hiveDbName, tableName)
+  }
 }
 
 object PaimonHiveTestBase {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 9a6719010e..9a1647da81 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -38,7 +38,7 @@ import org.scalactic.source.Position
 import org.scalatest.Tag
 
 import java.io.File
-import java.util.TimeZone
+import java.util.{TimeZone, UUID}
 
 import scala.util.Random
 
@@ -48,6 +48,8 @@ class PaimonSparkTestBase
   with WithTableOptions
   with SparkVersionSupport {
 
+  protected lazy val commitUser: String = UUID.randomUUID.toString
+
   protected lazy val fileIO: FileIO = LocalFileIO.create
 
   protected lazy val tempDBDir: File = Utils.createTempDir
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index b9c187b83a..0aa1829eee 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -18,10 +18,15 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.data.{BinaryString, GenericRow, Timestamp}
+import org.apache.paimon.manifest.ManifestCommittable
 import org.apache.paimon.spark.PaimonHiveTestBase
 
 import org.apache.spark.sql.{DataFrame, Row}
 
+import java.time.LocalDateTime
+import java.util.Collections
+
 class TableValuedFunctionsTest extends PaimonHiveTestBase {
 
   withPk.foreach {
@@ -91,10 +96,117 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
       }
   }
 
+  test("Table Valued Functions: paimon_incremental_between_timestamp") {
+    Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
+      catalogName =>
+        sql(s"USE $catalogName")
+        val dbName = "test_tvf_db"
+        withDatabase(dbName) {
+          sql(s"CREATE DATABASE $dbName")
+          withTable("t") {
+            sql(s"USE $dbName")
+            sql("CREATE TABLE t (id INT) USING paimon")
+
+            sql("INSERT INTO t VALUES 1")
+            Thread.sleep(100)
+            val t1 = System.currentTimeMillis()
+            sql("INSERT INTO t VALUES 2")
+            Thread.sleep(100)
+            val t2 = System.currentTimeMillis()
+            sql("INSERT INTO t VALUES 3")
+            sql("INSERT INTO t VALUES 4")
+            Thread.sleep(100)
+            val t3 = System.currentTimeMillis()
+            sql("INSERT INTO t VALUES 5")
+
+            checkAnswer(
+              sql(
+                s"SELECT * FROM paimon_incremental_between_timestamp('t', 
'$t1', '$t2') ORDER BY id"),
+              Seq(Row(2)))
+            checkAnswer(
+              sql(
+                s"SELECT * FROM 
paimon_incremental_between_timestamp('$dbName.t', '$t2', '$t3') ORDER BY id"),
+              Seq(Row(3), Row(4)))
+            checkAnswer(
+              sql(
+                s"SELECT * FROM 
paimon_incremental_between_timestamp('$catalogName.$dbName.t', '$t1', '$t3') 
ORDER BY id"),
+              Seq(Row(2), Row(3), Row(4)))
+          }
+        }
+    }
+  }
+
+  test("Table Valued Functions: paimon_incremental_to_auto_tag") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (a INT, b STRING) USING paimon
+            |TBLPROPERTIES ('primary-key' = 'a', 'bucket' = '1', 
'tag.automatic-creation'='watermark', 'tag.creation-period'='daily')
+            |""".stripMargin)
+
+      val table = loadTable("t")
+      val write = table.newWrite(commitUser)
+      val commit = table.newCommit(commitUser).ignoreEmptyCommit(false)
+
+      write.write(GenericRow.of(1, BinaryString.fromString("a")))
+      var commitMessages = write.prepareCommit(false, 0)
+      commit.commit(
+        new ManifestCommittable(
+          0,
+          utcMills("2024-12-02T10:00:00"),
+          Collections.emptyMap[Integer, java.lang.Long],
+          commitMessages))
+
+      write.write(GenericRow.of(2, BinaryString.fromString("b")))
+      commitMessages = write.prepareCommit(false, 1)
+      commit.commit(
+        new ManifestCommittable(
+          1,
+          utcMills("2024-12-03T10:00:00"),
+          Collections.emptyMap[Integer, java.lang.Long],
+          commitMessages))
+
+      write.write(GenericRow.of(3, BinaryString.fromString("c")))
+      commitMessages = write.prepareCommit(false, 2)
+      commit.commit(
+        new ManifestCommittable(
+          2,
+          utcMills("2024-12-05T10:00:00"),
+          Collections.emptyMap[Integer, java.lang.Long],
+          commitMessages))
+
+      checkAnswer(
+        sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-01') 
ORDER BY a"),
+        Seq())
+      checkAnswer(
+        sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-02') 
ORDER BY a"),
+        Seq(Row(2, "b")))
+      checkAnswer(
+        sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-03') 
ORDER BY a"),
+        Seq())
+      checkAnswer(
+        sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-04') 
ORDER BY a"),
+        Seq(Row(3, "c")))
+    }
+  }
+
   private def incrementalDF(tableIdent: String, start: Int, end: Int): 
DataFrame = {
     spark.read
       .format("paimon")
       .option("incremental-between", s"$start,$end")
       .table(tableIdent)
   }
+
+  private def utcMills(timestamp: String) =
+    Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond
+
+  object GenericRow {
+    def of(values: Any*): GenericRow = {
+      val row = new GenericRow(values.length)
+      values.zipWithIndex.foreach {
+        case (value, index) =>
+          row.setField(index, value)
+      }
+      row
+    }
+  }
 }

Reply via email to