This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d6d0438775 [HUDI-7110] Add call procedure for show column stats 
information (#10120)
8d6d0438775 is described below

commit 8d6d04387753662a5bb41f35874c6bbdd7021b36
Author: majian <47964462+majian1...@users.noreply.github.com>
AuthorDate: Thu Nov 23 10:08:17 2023 +0800

    [HUDI-7110] Add call procedure for show column stats information (#10120)
---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |   2 +-
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../ShowMetadataTableColumnStatsProcedure.scala    | 169 +++++++++++++++++++++
 .../sql/hudi/procedure/TestMetadataProcedure.scala |  66 ++++++++
 4 files changed, 237 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index dd76aee2f18..9cdb15092b0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -309,7 +309,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
     colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*)
   }
 
-  private def loadColumnStatsIndexRecords(targetColumns: Seq[String], 
shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
+  def loadColumnStatsIndexRecords(targetColumns: Seq[String], 
shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
     // Read Metadata Table's Column Stats Index records into [[HoodieData]] 
container by
     //    - Fetching the records from CSI by key-prefixes (encoded column 
names)
     //    - Extracting [[HoodieMetadataColumnStats]] records
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index ad63ddbb29e..1a960ecb8fd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -66,6 +66,7 @@ object HoodieProcedures {
       ,(ShowBootstrapPartitionsProcedure.NAME, 
ShowBootstrapPartitionsProcedure.builder)
       ,(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
       ,(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder)
+      ,(ShowMetadataTableColumnStatsProcedure.NAME, 
ShowMetadataTableColumnStatsProcedure.builder)
       ,(ShowMetadataTableFilesProcedure.NAME, 
ShowMetadataTableFilesProcedure.builder)
       ,(ShowMetadataTablePartitionsProcedure.NAME, 
ShowMetadataTablePartitionsProcedure.builder)
       ,(CreateMetadataTableProcedure.NAME, 
CreateMetadataTableProcedure.builder)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala
new file mode 100644
index 00000000000..60aa0f054b9
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.avro.generic.IndexedRecord
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.avro.model._
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.data.HoodieData
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline, 
HoodieInstant}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.function.{Function, Supplier}
+import scala.collection.{JavaConversions, mutable}
+import scala.jdk.CollectionConverters.{asScalaBufferConverter, 
asScalaIteratorConverter}
+
+
+class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "partition", DataTypes.StringType),
+    ProcedureParameter.optional(2, "targetColumns", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("file_name", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("column_name", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("min_value", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("max_value", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("null_num", DataTypes.LongType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val table = getArgValueOrDefault(args, PARAMETERS(0))
+    val partitions = getArgValueOrDefault(args, 
PARAMETERS(1)).getOrElse("").toString
+    val partitionsSeq = partitions.split(",").filter(_.nonEmpty).toSeq
+
+    val targetColumns = getArgValueOrDefault(args, 
PARAMETERS(2)).getOrElse("").toString
+    val targetColumnsSeq = targetColumns.split(",").toSeq
+    val basePath = getBasePath(table)
+    val metadataConfig = HoodieMetadataConfig.newBuilder
+      .enable(true)
+      .build
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val schema = 
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
+    val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, 
metadataConfig, metaClient)
+    val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = 
columnStatsIndex.loadColumnStatsIndexRecords(targetColumnsSeq, 
shouldReadInMemory = false)
+    val fsView = buildFileSystemView(table)
+    val allFileSlices: Set[FileSlice] = {
+      if (partitionsSeq.isEmpty) {
+        val engineCtx = new HoodieSparkEngineContext(jsc)
+        val metaTable = HoodieTableMetadata.create(engineCtx, metadataConfig, 
basePath)
+        metaTable.getAllPartitionPaths
+          .asScala
+          .flatMap(path => fsView.getLatestFileSlices(path).iterator().asScala)
+          .toSet
+      } else {
+        partitionsSeq
+          .flatMap(partition => 
fsView.getLatestFileSlices(partition).iterator().asScala)
+          .toSet
+      }
+    }
+
+    val allFileNames: Set[String] = 
allFileSlices.map(_.getBaseFile.get().getFileName)
+
+    val rows = mutable.ListBuffer[Row]()
+    colStatsRecords.collectAsList().asScala
+      .filter(c => allFileNames.contains(c.getFileName))
+      .foreach { c =>
+      rows += Row(c.getFileName, c.getColumnName, 
getColumnStatsValue(c.getMinValue),
+        getColumnStatsValue(c.getMaxValue), c.getNullCount.longValue())
+    }
+
+    rows.toList
+  }
+
+  private def getColumnStatsValue(stats_value: Any): String = {
+    stats_value match {
+      case _: IntWrapper |
+           _: BooleanWrapper |
+           _: DateWrapper |
+           _: DoubleWrapper |
+           _: FloatWrapper |
+           _: LongWrapper |
+           _: StringWrapper |
+           _: TimeMicrosWrapper |
+           _: TimestampMicrosWrapper =>
+        String.valueOf(stats_value.asInstanceOf[IndexedRecord].get(0))
+      case _: BytesWrapper =>
+        val bytes_value = stats_value.asInstanceOf[BytesWrapper].getValue
+        util.Arrays.toString(bytes_value.array())
+      case _: DecimalWrapper =>
+        val decimal_value = stats_value.asInstanceOf[DecimalWrapper].getValue
+        util.Arrays.toString(decimal_value.array())
+      case _ =>
+        throw new HoodieException(s"Unsupported type: 
${stats_value.getClass.getSimpleName}")
+    }
+  }
+
+  def buildFileSystemView(table: Option[Any]): HoodieTableFileSystemView = {
+    val basePath = getBasePath(table)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val fs = metaClient.getFs
+    val globPath = s"$basePath/*/*/*"
+    val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new 
Path(globPath))
+
+    val timeline = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
+
+    val maxInstant = metaClient.createNewInstantTime()
+    val instants = 
timeline.getInstants.iterator().asScala.filter(_.getTimestamp < maxInstant)
+
+    val details = new Function[HoodieInstant, 
org.apache.hudi.common.util.Option[Array[Byte]]]
+      with java.io.Serializable {
+      override def apply(instant: HoodieInstant): HOption[Array[Byte]] = {
+        metaClient.getActiveTimeline.getInstantDetails(instant)
+      }
+    }
+
+    val filteredTimeline = new HoodieDefaultTimeline(
+      new 
java.util.ArrayList[HoodieInstant](JavaConversions.asJavaCollection(instants.toList)).stream(),
 details)
+
+    new HoodieTableFileSystemView(metaClient, filteredTimeline, 
statuses.toArray(new Array[FileStatus](statuses.size)))
+  }
+
+  override def build: Procedure = new ShowMetadataTableColumnStatsProcedure()
+}
+
+object ShowMetadataTableColumnStatsProcedure {
+  val NAME = "show_metadata_table_column_stats"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowMetadataTableColumnStatsProcedure()
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
index c618c227ce1..b3ce71c70eb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
@@ -91,6 +91,72 @@ class TestMetadataProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call show_metadata_table_column_stats Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  c1 int,
+           |  c2 boolean,
+           |  c3 binary,
+           |  c4 date,
+           |  c5 decimal(10,1),
+           |  c6 double,
+           |  c7 float,
+           |  c8 long,
+           |  c9 string,
+           |  c10 timestamp
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'c1',
+           |  preCombineField = 'c8',
+           |  hoodie.metadata.enable="true",
+           |  hoodie.metadata.index.column.stats.enable="true"
+           | )
+       """.stripMargin)
+      // insert data to table
+
+      spark.sql(
+        s"""
+           |insert into table $tableName
+           |values (1, true, CAST('binary data' AS BINARY), CAST('2021-01-01' 
AS DATE), CAST(10.5 AS DECIMAL(10,1)), CAST(3.14 AS DOUBLE), CAST(2.5 AS 
FLOAT), 1000, 'example string', CAST('2021-01-01 00:00:00' AS TIMESTAMP))
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           |insert into table $tableName
+           |values (10, false, CAST('binary data' AS BINARY), 
CAST('2022-02-02' AS DATE),  CAST(20.5 AS DECIMAL(10,1)), CAST(6.28 AS DOUBLE), 
CAST(3.14 AS FLOAT), 2000, 'another string', CAST('2022-02-02 00:00:00' AS 
TIMESTAMP))
+           |""".stripMargin)
+
+      // Only numerical and string types are compared for clarity on min/max 
values.
+      val expectedValues = Map(
+        1 -> ("1", "10"),
+        2 -> ("false", "true"),
+        6 -> ("3.14", "6.28"),
+        7 -> ("2.5", "3.14"),
+        8 -> ("1000", "2000"),
+        9 -> ("another string", "example string")
+      )
+
+      for (i <- 1 to 10) {
+        val columnName = s"c$i"
+        val metadataStats = spark.sql(s"""call 
show_metadata_table_column_stats(table => '$tableName', targetColumns => 
'$columnName')""").collect()
+        assertResult(1)(metadataStats.length)
+        val minVal: String = metadataStats(0).getAs[String]("min_value")
+        val maxVal: String = metadataStats(0).getAs[String]("max_value")
+
+        expectedValues.get(i) match {
+          case Some((expectedMin, expectedMax)) =>
+            assertResult(expectedMin)(minVal)
+            assertResult(expectedMax)(maxVal)
+          case None => // Do nothing if no expected values found
+        }
+      }
+    }
+  }
+
   test("Test Call show_metadata_table_stats Procedure") {
     withTempDir { tmp =>
       val tableName = generateTableName

Reply via email to