This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 243098c39af [HUDI-6223] Support Incremental read via Spark SQL table
valued function (#8729)
243098c39af is described below
commit 243098c39afffd79d881c056282016a2732eaabc
Author: kazdy <[email protected]>
AuthorDate: Tue Jun 20 04:05:51 2023 +0200
[HUDI-6223] Support Incremental read via Spark SQL table valued function
(#8729)
* add hudi_table_changes TVF
* add "earliest" start instant option to hudi_table_changes TVF
* add HoodieTableChanges to catalyst plan analysis
* the first param of hudi_table_changes is polymophic, it can be the table
id or table path
---------
Co-authored-by: Danny Chan <[email protected]>
---
.../hudi/functional/TestSparkSqlCoreFlow.scala | 16 +-
.../sql/hudi/TestHoodieTableValuedFunction.scala | 265 +++++++++++++++++++++
.../catalyst/plans/logcal/HoodieTableChanges.scala | 82 +++++++
.../hudi/analysis/HoodieSpark32PlusAnalysis.scala | 28 ++-
.../sql/hudi/analysis/TableValuedFunctions.scala | 7 +-
5 files changed, 379 insertions(+), 19 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
index cf36f4b258f..fa883cd3eb2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
@@ -121,16 +121,12 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
snapshotDf3.unpersist(true)
- // Read Incremental Query, need to use spark-ds because functionality does
not exist for spark sql
+ // Read Incremental Query, uses hudi_table_changes() table valued function
for spark sql
// we have 2 commits, try pulling the first commit (which is not the
latest)
//HUDI-5266
val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
- val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key,
QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000")
- .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit)
- .load(tableBasePath)
- //val hoodieIncViewDf1 = doIncRead(tableName, isMetadataEnabledOnRead,
"000", firstCommit)
+ val hoodieIncViewDf1 = spark.sql(s"select * from
hudi_table_changes('$tableName', 'earliest', '$firstCommit')")
+
assertEquals(100, hoodieIncViewDf1.count()) // 100 initial inserts must be
pulled
var countsPerCommit =
hoodieIncViewDf1.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
@@ -141,11 +137,7 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
//another incremental query with commit2 and commit3
//HUDI-5266
- val hoodieIncViewDf2 = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key,
QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
- .option(DataSourceReadOptions.END_INSTANTTIME.key(), commitInstantTime3)
- .load(tableBasePath)
+ val hoodieIncViewDf2 = spark.sql(s"select * from
hudi_table_changes('$tableName', '$commitInstantTime2', '$commitInstantTime3')")
assertEquals(uniqueKeyCnt2, hoodieIncViewDf2.count()) // 60 records must
be pulled
countsPerCommit =
hoodieIncViewDf2.groupBy("_hoodie_commit_time").count().collect()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
index 09aa9a50456..1b0aa3ff6e1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.HoodieSparkUtils
+import org.apache.spark.sql.functions.{col, from_json}
class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase {
@@ -80,4 +81,268 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
}
}
}
+
+ test(s"Test hudi_table_changes latest_state") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir { tmp =>
+ Seq(
+ ("cow", true),
+ ("mor", true),
+ ("cow", false),
+ ("mor", false)
+ ).foreach { parameters =>
+ val tableType = parameters._1
+ val isTableId = parameters._2
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ val identifier = if (isTableId) tableName else tablePath
+ spark.sql("set hoodie.sql.insert.mode = non-strict")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ |location '$tablePath'
+ |""".stripMargin
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1', 10, 1000), (2, 'a2', 20, 1000), (3, 'a3',
30, 1000)
+ | """.stripMargin
+ )
+
+ val firstInstant = spark.sql(s"select min(_hoodie_commit_time) as
commitTime from $tableName order by commitTime").first().getString(0)
+
+ checkAnswer(
+ s"""select id,
+ |name,
+ |price,
+ |ts
+ |from hudi_table_changes('$identifier', 'latest_state',
'earliest')
+ |""".stripMargin
+ )(
+ Seq(1, "a1", 10.0, 1000),
+ Seq(2, "a2", 20.0, 1000),
+ Seq(3, "a3", 30.0, 1000)
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1_1', 10, 1100), (2, 'a2_2', 20, 1100), (3,
'a3_3', 30, 1100)
+ | """.stripMargin
+ )
+ val secondInstant = spark.sql(s"select max(_hoodie_commit_time) as
commitTime from $tableName order by commitTime").first().getString(0)
+
+ checkAnswer(
+ s"""select id,
+ |name,
+ |price,
+ |ts
+ |from hudi_table_changes(
+ |'$identifier',
+ |'latest_state',
+ |'$firstInstant')
+ |""".stripMargin
+ )(
+ Seq(1, "a1_1", 10.0, 1100),
+ Seq(2, "a2_2", 20.0, 1100),
+ Seq(3, "a3_3", 30.0, 1100)
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1_1', 10, 1200), (2, 'a2_2', 20, 1200), (3,
'a3_3', 30, 1200)
+ | """.stripMargin
+ )
+
+ // should not include the first and latest instant
+ checkAnswer(
+ s"""select id,
+ | name,
+ | price,
+ | ts
+ | from hudi_table_changes(
+ | '$identifier',
+ | 'latest_state',
+ | '$firstInstant',
+ | '$secondInstant')
+ | """.stripMargin
+ )(
+ Seq(1, "a1_1", 10.0, 1100),
+ Seq(2, "a2_2", 20.0, 1100),
+ Seq(3, "a3_3", 30.0, 1100)
+ )
+ }
+ }
+ }
+ }
+
+ test(s"Test hudi_table_changes cdc") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir { tmp =>
+ Seq(
+ ("cow", true),
+ ("mor", true),
+ ("cow", false),
+ ("mor", false)
+ ).foreach { parameters =>
+ val tableType = parameters._1
+ val isTableId = parameters._2
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ val identifier = if (isTableId) tableName else tablePath
+ spark.sql("set hoodie.sql.insert.mode = upsert")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | 'hoodie.table.cdc.enabled' = 'true',
+ | 'hoodie.table.cdc.supplemental.logging.mode' =
'data_before_after'
+ |)
+ |location '$tablePath'
+ |""".stripMargin
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1', 10, 1000), (2, 'a2', 20, 1000), (3, 'a3',
30, 1000)
+ | """.stripMargin
+ )
+ val originSchema = spark.read.format("hudi").load(tablePath).schema
+ val firstInstant = spark.sql(s"select min(_hoodie_commit_time) as
commitTime from $tableName order by commitTime").first().getString(0)
+
+ val cdcDataOnly1 = spark.sql(
+ s"""select
+ | op,
+ | before,
+ | after
+ |from hudi_table_changes('$identifier', 'cdc', 'earliest')
+ |""".stripMargin
+ )
+
+ val change1 = cdcDataOnly1.select(
+ col("op"),
+ col("before"),
+ from_json(col("after"), originSchema).as("after")
+ ).select(
+ col("op"),
+ col("before"),
+ col("after.id"),
+ col("after.name"),
+ col("after.price"),
+ col("after.ts")
+ ).orderBy("after.id").collect()
+ checkAnswer(change1)(
+ Seq("i", null, 1, "a1", 10.0, 1000),
+ Seq("i", null, 2, "a2", 20.0, 1000),
+ Seq("i", null, 3, "a3", 30.0, 1000)
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1_1', 10, 1100), (2, 'a2_2', 20, 1100), (3,
'a3_3', 30, 1100)
+ | """.stripMargin
+ )
+ val secondInstant = spark.sql(s"select max(_hoodie_commit_time) as
commitTime from $tableName order by commitTime").first().getString(0)
+
+ val cdcDataOnly2 = spark.sql(
+ s"""select
+ | op,
+ | before,
+ | after
+ |from hudi_table_changes(
+ |'$identifier',
+ |'cdc',
+ |'$firstInstant')
+ |""".stripMargin
+ )
+
+ val change2 = cdcDataOnly2.select(
+ col("op"),
+ from_json(col("before"), originSchema).as("before"),
+ from_json(col("after"), originSchema).as("after")
+ ).select(
+ col("op"),
+ col("before.id"),
+ col("before.name"),
+ col("before.price"),
+ col("before.ts"),
+ col("after.id"),
+ col("after.name"),
+ col("after.price"),
+ col("after.ts")
+ ).orderBy("after.id").collect()
+ checkAnswer(change2)(
+ Seq("u", 1, "a1", 10.0, 1000, 1, "a1_1", 10.0, 1100),
+ Seq("u", 2, "a2", 20.0, 1000, 2, "a2_2", 20.0, 1100),
+ Seq("u", 3, "a3", 30.0, 1000, 3, "a3_3", 30.0, 1100)
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1_1', 11, 1200), (2, 'a2_2', 21, 1200), (3,
'a3_3', 31, 1200)
+ | """.stripMargin
+ )
+
+ // should not include the first and latest instant
+ val cdcDataOnly3 = spark.sql(
+ s"""select
+ | op,
+ | before,
+ | after
+ | from hudi_table_changes(
+ | '$identifier',
+ | 'cdc',
+ | '$firstInstant',
+ | '$secondInstant')
+ | """.stripMargin
+ )
+
+ val change3 = cdcDataOnly3.select(
+ col("op"),
+ from_json(col("before"), originSchema).as("before"),
+ from_json(col("after"), originSchema).as("after")
+ ).select(
+ col("op"),
+ col("before.id"),
+ col("before.name"),
+ col("before.price"),
+ col("before.ts"),
+ col("after.id"),
+ col("after.name"),
+ col("after.price"),
+ col("after.ts")
+ ).orderBy("after.id").collect()
+ checkAnswer(change3)(
+ Seq("u", 1, "a1", 10.0, 1000, 1, "a1_1", 10.0, 1100),
+ Seq("u", 2, "a2", 20.0, 1000, 2, "a2_2", 20.0, 1100),
+ Seq("u", 3, "a3", 30.0, 1000, 3, "a3_3", 30.0, 1100)
+ )
+ }
+ }
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieTableChanges.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieTableChanges.scala
new file mode 100644
index 00000000000..8199e18918b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieTableChanges.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logcal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+object HoodieTableChangesOptionsParser {
+ def parseOptions(exprs: Seq[Expression], funcName: String): (String,
Map[String, String]) = {
+ val args = exprs.map(_.eval().toString)
+
+ if (args.size < 3 || args.size > 4) {
+ throw new AnalysisException(s"Expect arguments (table_name or
table_path, incremental_format, start_instant, [end_instant]) for function
`$funcName`")
+ }
+
+ val identifier = args.head
+ val incrementalQueryFormat = args(1)
+ val startInstantTime = args(2)
+ val endInstantTime = args.drop(3).headOption
+
+ val incrementalQueryTypeOpt = Map("hoodie.datasource.query.type" ->
"incremental")
+
+ val incrementalQueryFormatOpt = incrementalQueryFormat match {
+ case "latest_state" | "cdc" =>
Map("hoodie.datasource.query.incremental.format" -> incrementalQueryFormat)
+ case _ => throw new AnalysisException(s"'hudi_table_changes' doesn't
support `$incrementalQueryFormat`")
+ }
+
+ val startInstantTimeOpt = startInstantTime match {
+ case "earliest" => Map("hoodie.datasource.read.begin.instanttime" ->
"000")
+ case _ => Map("hoodie.datasource.read.begin.instanttime" ->
startInstantTime)
+ }
+
+ val endInstantTimeOpt = endInstantTime match {
+ case Some(x) => Map("hoodie.datasource.read.end.instanttime" -> x)
+ case None => Map.empty[String, String]
+ }
+
+ val opts: Map[String, String] = incrementalQueryTypeOpt ++
incrementalQueryFormatOpt ++ startInstantTimeOpt ++ endInstantTimeOpt
+
+ (identifier, opts)
+ }
+
+}
+
+
+case class HoodieTableChanges(args: Seq[Expression]) extends LeafNode {
+
+ override def output: Seq[Attribute] = Nil
+
+ override lazy val resolved: Boolean = false
+
+}
+
+object HoodieTableChanges {
+
+ val FUNC_NAME = "hudi_table_changes";
+
+}
+
+case class HoodieTableChangesByPath(args: Seq[Expression]) extends LeafNode {
+
+ override def output: Seq[Attribute] = Nil
+
+ override lazy val resolved: Boolean = false
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index 58841f19df2..fc4d5d18ff2 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -17,12 +17,12 @@
package org.apache.spark.sql.hudi.analysis
+import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceReadOptions, DefaultSource,
SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
import org.apache.spark.sql.catalyst.analysis.UnresolvedPartitionSpec
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery
-import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery.parseOptions
+import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery,
HoodieTableChanges, HoodieTableChangesByPath, HoodieTableChangesOptionsParser}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
@@ -54,12 +54,12 @@ case class HoodieDataSourceV2ToV1Fallback(sparkSession:
SparkSession) extends Ru
// NOTE: Unfortunately, [[InsertIntoStatement]] is implemented in a way
that doesn't expose
// target relation as a child (even though there's no good reason
for that)
- case iis @ InsertIntoStatement(rv2 @ DataSourceV2Relation(v2Table:
HoodieInternalV2Table, _, _, _, _), _, _, _, _, _) =>
+ case iis@InsertIntoStatement(rv2@DataSourceV2Relation(v2Table:
HoodieInternalV2Table, _, _, _, _), _, _, _, _, _) =>
iis.copy(table = convertToV1(rv2, v2Table))
case _ =>
plan.resolveOperatorsDown {
- case rv2 @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _,
_, _) => convertToV1(rv2, v2Table)
+ case rv2@DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _,
_) => convertToV1(rv2, v2Table)
}
}
@@ -101,8 +101,8 @@ case class HoodieSpark32PlusResolveReferences(spark:
SparkSession) extends Rule[
LogicalRelation(relation, table)
- case q: HoodieQuery =>
- val (tableName, opts) = parseOptions(q.args)
+ case HoodieQuery(args) =>
+ val (tableName, opts) = HoodieQuery.parseOptions(args)
val tableId =
spark.sessionState.sqlParser.parseTableIdentifier(tableName)
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
@@ -112,6 +112,22 @@ case class HoodieSpark32PlusResolveReferences(spark:
SparkSession) extends Rule[
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
+
+ case HoodieTableChanges(args) =>
+ val (tablePath, opts) =
HoodieTableChangesOptionsParser.parseOptions(args, HoodieTableChanges.FUNC_NAME)
+ val hoodieDataSource = new DefaultSource
+ if (tablePath.contains(Path.SEPARATOR)) {
+ // the first param is table path
+ val relation = hoodieDataSource.createRelation(spark.sqlContext, opts
++ Map("path" -> tablePath))
+ LogicalRelation(relation)
+ } else {
+ // the first param is table identifier
+ val tableId =
spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
+ val relation = hoodieDataSource.createRelation(spark.sqlContext, opts
++ Map("path" ->
+ catalogTable.location.toString))
+ LogicalRelation(relation, catalogTable)
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
index d71ece5dcd0..7a7da9b2752 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
-import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery
+import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery,
HoodieTableChanges}
object TableValuedFunctions {
@@ -28,6 +28,11 @@ object TableValuedFunctions {
FunctionIdentifier(HoodieQuery.FUNC_NAME),
new ExpressionInfo(HoodieQuery.getClass.getCanonicalName,
HoodieQuery.FUNC_NAME),
(args: Seq[Expression]) => new HoodieQuery(args)
+ ),
+ (
+ FunctionIdentifier(HoodieTableChanges.FUNC_NAME),
+ new ExpressionInfo(HoodieTableChanges.getClass.getCanonicalName,
HoodieTableChanges.FUNC_NAME),
+ (args: Seq[Expression]) => new HoodieTableChanges(args)
)
)
}