This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 108a885b4db [HUDI-7294] TVF to query hudi metadata (#10491)
108a885b4db is described below
commit 108a885b4db62f08d30ede47805b8b44c35ab1e6
Author: bhat-vinay <[email protected]>
AuthorDate: Wed Jan 17 08:21:07 2024 +0530
[HUDI-7294] TVF to query hudi metadata (#10491)
Adds a TVF function to query hudi metadata through spark-sql. Since the
metadata is already a MOR table, it simply creates a 'snapshot' on
a MOR relation. Could not find any way to format (or filter) the RDD
generated by the MOR snapshot relation. Uploading the PR to get some feedback.
Co-authored-by: Vinaykumar Bhat <[email protected]>
---
.../sql/hudi/TestHoodieTableValuedFunction.scala | 68 ++++++++++++++++++++++
.../logcal/HoodieMetadataTableValuedFunction.scala | 46 +++++++++++++++
.../hudi/analysis/HoodieSpark32PlusAnalysis.scala | 17 +++++-
.../sql/hudi/analysis/TableValuedFunctions.scala | 7 ++-
4 files changed, 136 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
index 867e83c301e..bdf512d3451 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
@@ -21,6 +21,8 @@ import
org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
import org.apache.hudi.HoodieSparkUtils
import org.apache.spark.sql.functions.{col, from_json}
+import scala.collection.Seq
+
class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase {
test(s"Test hudi_query Table-Valued Function") {
@@ -558,4 +560,70 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
}
}
}
+
+ test(s"Test hudi_metadata Table-Valued Function") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val identifier = tableName
+ spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | price int
+ |) using hudi
+ |partitioned by (price)
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.metadata.index.column.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.column.list = 'price'
+ |)
+ |location '${tmp.getCanonicalPath}/$tableName'
+ |""".stripMargin
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3',
3000, 30)
+ | """.stripMargin
+ )
+
+ val result2DF = spark.sql(
+ s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where type=1"
+ )
+ assert(result2DF.count() == 1)
+
+ val result3DF = spark.sql(
+ s"select type, key, filesystemmetadata from
hudi_metadata('$identifier') where type=2"
+ )
+ assert(result3DF.count() == 3)
+
+ val result4DF = spark.sql(
+ s"select type, key, ColumnStatsMetadata from
hudi_metadata('$identifier') where type=3"
+ )
+ assert(result4DF.count() == 3)
+
+ val result5DF = spark.sql(
+ s"select type, key, recordIndexMetadata from
hudi_metadata('$identifier') where type=5"
+ )
+ assert(result5DF.count() == 3)
+
+ val result6DF = spark.sql(
+ s"select type, key, BloomFilterMetadata from
hudi_metadata('$identifier') where BloomFilterMetadata is not null"
+ )
+ assert(result6DF.count() == 0)
+ }
+ }
+ }
+ spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala
new file mode 100644
index 00000000000..c4eca4bd0a9
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logcal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+object HoodieMetadataTableValuedFunction {
+
+ val FUNC_NAME = "hudi_metadata";
+
+ def parseOptions(exprs: Seq[Expression], funcName: String): (String,
Map[String, String]) = {
+ val args = exprs.map(_.eval().toString)
+ if (args.size != 1) {
+ throw new AnalysisException(s"Expect arguments (table_name or
table_path) for function `$funcName`")
+ }
+
+ val identifier = args.head
+
+ (identifier, Map("hoodie.datasource.query.type" -> "snapshot"))
+ }
+}
+
+case class HoodieMetadataTableValuedFunction(args: Seq[Expression]) extends
LeafNode {
+
+ override def output: Seq[Attribute] = Nil
+
+ override lazy val resolved: Boolean = false
+}
+
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index bc8edc72295..9c2f5bfb58c 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionBy
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName,
UnresolvedPartitionSpec}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Expression
-import
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
HoodieFileSystemViewTableValuedFunctionOptionsParser, HoodieQuery,
HoodieTableChanges, HoodieTableChangesOptionsParser,
HoodieTimelineTableValuedFunction,
HoodieTimelineTableValuedFunctionOptionsParser}
+import
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
HoodieFileSystemViewTableValuedFunctionOptionsParser,
HoodieMetadataTableValuedFunction, HoodieQuery, HoodieTableChanges,
HoodieTableChangesOptionsParser, HoodieTimelineTableValuedFunction,
HoodieTimelineTableValuedFunctionOptionsParser}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.Origin
@@ -134,6 +134,21 @@ case class HoodieSpark32PlusResolveReferences(spark:
SparkSession) extends Rule[
catalogTable.location.toString))
LogicalRelation(relation, catalogTable)
}
+ case HoodieMetadataTableValuedFunction(args) =>
+ val (tablePath, opts) =
HoodieMetadataTableValuedFunction.parseOptions(args,
HoodieMetadataTableValuedFunction.FUNC_NAME)
+ val hoodieDataSource = new DefaultSource
+ if (tablePath.contains(Path.SEPARATOR)) {
+ // the first param is table path
+ val relation = hoodieDataSource.createRelation(spark.sqlContext, opts
++ Map("path" -> (tablePath + "/.hoodie/metadata")))
+ LogicalRelation(relation)
+ } else {
+ // the first param is table identifier
+ val tableId =
spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
+ val relation = hoodieDataSource.createRelation(spark.sqlContext, opts
++ Map("path" ->
+ (catalogTable.location.toString + "/.hoodie/metadata")))
+ LogicalRelation(relation, catalogTable)
+ }
case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
// START: custom Hudi change: don't want to go to the spark mit
resolution so we resolve the source and target
// if they haven't been
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
index b5f71389fab..e87a6de8db9 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
-import
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
HoodieQuery, HoodieTableChanges, HoodieTimelineTableValuedFunction}
+import
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
HoodieMetadataTableValuedFunction, HoodieQuery, HoodieTableChanges,
HoodieTimelineTableValuedFunction}
object TableValuedFunctions {
@@ -43,6 +43,11 @@ object TableValuedFunctions {
FunctionIdentifier(HoodieFileSystemViewTableValuedFunction.FUNC_NAME),
new
ExpressionInfo(HoodieFileSystemViewTableValuedFunction.getClass.getCanonicalName,
HoodieFileSystemViewTableValuedFunction.FUNC_NAME),
(args: Seq[Expression]) => new
HoodieFileSystemViewTableValuedFunction(args)
+ ),
+ (
+ FunctionIdentifier(HoodieMetadataTableValuedFunction.FUNC_NAME),
+ new
ExpressionInfo(HoodieMetadataTableValuedFunction.getClass.getCanonicalName,
HoodieMetadataTableValuedFunction.FUNC_NAME),
+ (args: Seq[Expression]) => new HoodieMetadataTableValuedFunction(args)
)
)
}