This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e0954040a9 [HUDI-3511] Add call procedure for MetadataCommand (#6018)
e0954040a9 is described below

commit e0954040a9ce490843a56d3497715bbc9eb7ffc0
Author: superche <[email protected]>
AuthorDate: Sun Jul 3 21:44:56 2022 +0800

    [HUDI-3511] Add call procedure for MetadataCommand (#6018)
---
 .../hudi/command/procedures/HoodieProcedures.scala |   7 +
 .../procedures/ListMetadataFilesProcedure.scala    |  83 +++++++
 .../ListMetadataPartitionsProcedure.scala          |  81 +++++++
 .../procedures/MetadataCreateProcedure.scala       |  80 +++++++
 .../procedures/MetadataDeleteProcedure.scala       |  71 ++++++
 .../command/procedures/MetadataInitProcedure.scala |  84 +++++++
 .../procedures/ShowMetadataStatsProcedure.scala    |  75 ++++++
 .../ValidateMetadataFilesProcedure.scala           | 147 ++++++++++++
 .../sql/hudi/procedure/TestMetadataProcedure.scala | 262 +++++++++++++++++++++
 9 files changed, 890 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index b8893ac80d..0620e287d0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -66,6 +66,13 @@ object HoodieProcedures {
     mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, 
ShowBootstrapPartitionsProcedure.builder)
     mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
     mapBuilder.put(DowngradeTableProcedure.NAME, 
DowngradeTableProcedure.builder)
+    mapBuilder.put(ListMetadataFilesProcedure.NAME, 
ListMetadataFilesProcedure.builder)
+    mapBuilder.put(ListMetadataPartitionsProcedure.NAME, 
ListMetadataPartitionsProcedure.builder)
+    mapBuilder.put(MetadataCreateProcedure.NAME, 
MetadataCreateProcedure.builder)
+    mapBuilder.put(MetadataDeleteProcedure.NAME, 
MetadataDeleteProcedure.builder)
+    mapBuilder.put(MetadataInitProcedure.NAME, MetadataInitProcedure.builder)
+    mapBuilder.put(ShowMetadataStatsProcedure.NAME, 
ShowMetadataStatsProcedure.builder)
+    mapBuilder.put(ValidateMetadataFilesProcedure.NAME, 
ValidateMetadataFilesProcedure.builder)
     mapBuilder.build
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala
new file mode 100644
index 0000000000..591293e08d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.function.Supplier
+
+class ListMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder 
with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "partition", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("file_path", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val table = getArgValueOrDefault(args, PARAMETERS(0))
+    val partition = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[String]
+
+    val basePath = getBasePath(table)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val config = HoodieMetadataConfig.newBuilder.enable(true).build
+    val metaReader = new HoodieBackedTableMetadata(new 
HoodieLocalEngineContext(metaClient.getHadoopConf),
+      config, basePath, "/tmp")
+    if (!metaReader.enabled){
+      throw new HoodieException(s"Metadata Table not enabled/initialized.")
+    }
+
+    val timer = new HoodieTimer().startTimer
+    val statuses = metaReader.getAllFilesInPartition(new Path(basePath, 
partition))
+    logDebug("Took " + timer.endTimer + " ms")
+
+    val rows = new util.ArrayList[Row]
+    statuses.toStream.sortBy(p => p.getPath.getName).foreach((f: FileStatus) 
=> {
+        rows.add(Row(f.getPath.getName))
+    })
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new ListMetadataFilesProcedure()
+}
+
+object ListMetadataFilesProcedure {
+  val NAME = "list_metadata_files"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ListMetadataFilesProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala
new file mode 100644
index 0000000000..4c0bf15d90
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.Collections
+import java.util.function.Supplier
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class ListMetadataPartitionsProcedure() extends BaseProcedure with 
ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("partition", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val table = getArgValueOrDefault(args, PARAMETERS(0))
+
+    val basePath = getBasePath(table)
+    val config = HoodieMetadataConfig.newBuilder.enable(true).build
+    val metadata = new HoodieBackedTableMetadata(new 
HoodieSparkEngineContext(jsc),
+      config, basePath, "/tmp")
+    if (!metadata.enabled){
+      throw new HoodieException(s"Metadata Table not enabled/initialized.")
+    }
+
+    val timer = new HoodieTimer().startTimer
+    val partitions = metadata.getAllPartitionPaths
+    Collections.sort(partitions)
+    logDebug("Took " + timer.endTimer + " ms")
+
+    val rows = new util.ArrayList[Row]
+    partitions.stream.iterator().asScala.foreach((p: String) => {
+        rows.add(Row(p))
+    })
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new ListMetadataPartitionsProcedure()
+}
+
+object ListMetadataPartitionsProcedure {
+  val NAME = "list_metadata_partitions"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ListMetadataPartitionsProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala
new file mode 100644
index 0000000000..e20459ea36
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.metadata.{HoodieTableMetadata, 
SparkHoodieBackedTableMetadataWriter}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+import java.io.FileNotFoundException
+import java.util.function.Supplier
+
+class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+
+    val basePath = getBasePath(tableName)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val metadataPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
+
+    try {
+      val statuses = metaClient.getFs.listStatus(metadataPath)
+      if (statuses.nonEmpty) {
+        throw new RuntimeException("Metadata directory (" + 
metadataPath.toString + ") not empty.")
+      }
+    } catch {
+      case e: FileNotFoundException =>
+        // Metadata directory does not exist yet
+        metaClient.getFs.mkdirs(metadataPath)
+    }
+    val timer = new HoodieTimer().startTimer
+    val writeConfig = getWriteConfig(basePath)
+    SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf, 
writeConfig, new HoodieSparkEngineContext(jsc))
+    Seq(Row("Created Metadata Table in " +  metadataPath + " (duration=" + 
timer.endTimer / 1000.0 + "secs)"))
+  }
+
+  override def build = new MetadataCreateProcedure()
+}
+
+object MetadataCreateProcedure {
+  val NAME = "metadata_create"
+  var metadataBaseDirectory: Option[String] = None
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new MetadataCreateProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala
new file mode 100644
index 0000000000..216a365117
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+import java.io.FileNotFoundException
+import java.util.function.Supplier
+
+class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val basePath = getBasePath(tableName)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val metadataPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
+
+    try {
+      val statuses = metaClient.getFs.listStatus(metadataPath)
+      if (statuses.nonEmpty) metaClient.getFs.delete(metadataPath, true)
+    } catch {
+      case e: FileNotFoundException =>
+      // Metadata directory does not exist
+    }
+    Seq(Row("Removed Metadata Table from " + metadataPath))
+  }
+
+  override def build = new MetadataDeleteProcedure()
+}
+
+object MetadataDeleteProcedure {
+  val NAME = "metadata_delete"
+  var metadataBaseDirectory: Option[String] = None
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new MetadataDeleteProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala
new file mode 100644
index 0000000000..acd1532a97
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.metadata.{HoodieTableMetadata, 
SparkHoodieBackedTableMetadataWriter}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+import java.io.FileNotFoundException
+import java.util.function.Supplier
+
+class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "readOnly", DataTypes.BooleanType, false)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val readOnly = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Boolean]
+
+    val basePath = getBasePath(tableName)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val metadataPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
+    try {
+      metaClient.getFs.listStatus(metadataPath)
+    } catch {
+      case e: FileNotFoundException =>
+        // Metadata directory does not exist yet
+        throw new RuntimeException("Metadata directory (" + 
metadataPath.toString + ") does not exist.")
+    }
+
+    val timer = new HoodieTimer().startTimer
+    if (!readOnly) {
+      val writeConfig = getWriteConfig(basePath)
+      SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf, 
writeConfig, new HoodieSparkEngineContext(jsc))
+    }
+
+    val action = if (readOnly) "Opened" else "Initialized"
+    Seq(Row(action + " Metadata Table in " + metadataPath + " (duration=" + 
timer.endTimer / 1000.0 + "sec)"))
+  }
+
+  override def build = new MetadataInitProcedure()
+}
+
+object MetadataInitProcedure {
+  val NAME = "metadata_init"
+  var metadataBaseDirectory: Option[String] = None
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new MetadataInitProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala
new file mode 100644
index 0000000000..9a73a51fd1
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+
+class ShowMetadataStatsProcedure() extends BaseProcedure with ProcedureBuilder 
{
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("stat_key", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("stat_value", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val table = getArgValueOrDefault(args, PARAMETERS(0))
+
+    val basePath = getBasePath(table)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val config = HoodieMetadataConfig.newBuilder.enable(true).build
+    val metadata = new HoodieBackedTableMetadata(new 
HoodieLocalEngineContext(metaClient.getHadoopConf),
+      config, basePath, "/tmp")
+    val stats = metadata.stats
+
+    val rows = new util.ArrayList[Row]
+    for (entry <- stats.entrySet) {
+      rows.add(Row(entry.getKey, entry.getValue))
+    }
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new ShowMetadataStatsProcedure()
+}
+
+object ShowMetadataStatsProcedure {
+  val NAME = "show_metadata_stats"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowMetadataStatsProcedure()
+  }
+}
+
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala
new file mode 100644
index 0000000000..b3c125942a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.Collections
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class ValidateMetadataFilesProcedure() extends BaseProcedure with 
ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "verbose", DataTypes.BooleanType, false)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("partition", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("file_name", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("is_present_in_fs", DataTypes.BooleanType, nullable = true, 
Metadata.empty),
+    StructField("is_resent_in_metadata", DataTypes.BooleanType, nullable = 
true, Metadata.empty),
+    StructField("fs_size", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("metadata_size", DataTypes.LongType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val table = getArgValueOrDefault(args, PARAMETERS(0))
+    val verbose = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Boolean]
+
+    val basePath = getBasePath(table)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val config = HoodieMetadataConfig.newBuilder.enable(true).build
+    val metadataReader = new HoodieBackedTableMetadata(new 
HoodieLocalEngineContext(metaClient.getHadoopConf),
+      config, basePath, "/tmp")
+
+    if (!metadataReader.enabled){
+      throw new HoodieException(s"Metadata Table not enabled/initialized.")
+    }
+
+    val fsConfig = HoodieMetadataConfig.newBuilder.enable(false).build
+    val fsMetaReader = new HoodieBackedTableMetadata(new 
HoodieLocalEngineContext(metaClient.getHadoopConf),
+      fsConfig, basePath, "/tmp")
+
+    val timer = new HoodieTimer().startTimer
+    val metadataPartitions = metadataReader.getAllPartitionPaths
+    logDebug("Listing partitions Took " + timer.endTimer + " ms")
+    val fsPartitions = fsMetaReader.getAllPartitionPaths
+    Collections.sort(fsPartitions)
+    Collections.sort(metadataPartitions)
+
+    val allPartitions = new util.HashSet[String]
+    allPartitions.addAll(fsPartitions)
+    allPartitions.addAll(metadataPartitions)
+
+    if (!fsPartitions.equals(metadataPartitions)) {
+      logError("FS partition listing is not matching with metadata partition 
listing!")
+      logError("All FS partitions: " + 
util.Arrays.toString(fsPartitions.toArray))
+      logError("All Metadata partitions: " + 
util.Arrays.toString(metadataPartitions.toArray))
+    }
+
+    val rows = new util.ArrayList[Row]
+    for (partition <- allPartitions) {
+      val fileStatusMap = new util.HashMap[String, FileStatus]
+      val metadataFileStatusMap = new util.HashMap[String, FileStatus]
+      val metadataStatuses = metadataReader.getAllFilesInPartition(new 
Path(basePath, partition))
+      util.Arrays.stream(metadataStatuses).iterator().asScala.foreach((entry: 
FileStatus) => metadataFileStatusMap.put(entry.getPath.getName, entry))
+      val fsStatuses = fsMetaReader.getAllFilesInPartition(new Path(basePath, 
partition))
+      util.Arrays.stream(fsStatuses).iterator().asScala.foreach((entry: 
FileStatus) => fileStatusMap.put(entry.getPath.getName, entry))
+      val allFiles = new util.HashSet[String]
+      allFiles.addAll(fileStatusMap.keySet)
+      allFiles.addAll(metadataFileStatusMap.keySet)
+      for (file <- allFiles) {
+        val fsFileStatus = fileStatusMap.get(file)
+        val metaFileStatus = metadataFileStatusMap.get(file)
+        val doesFsFileExists = fsFileStatus != null
+        val doesMetadataFileExists = metaFileStatus != null
+        val fsFileLength = if (doesFsFileExists) fsFileStatus.getLen else 0
+        val metadataFileLength = if (doesMetadataFileExists) 
metaFileStatus.getLen else 0
+        if (verbose) { // if verbose print all files
+          rows.add(Row(partition, file, doesFsFileExists, 
doesMetadataFileExists, fsFileLength, metadataFileLength))
+        } else if ((doesFsFileExists != doesMetadataFileExists) || 
(fsFileLength != metadataFileLength)) { // if non verbose, print only non 
matching files
+          rows.add(Row(partition, file, doesFsFileExists, 
doesMetadataFileExists, fsFileLength, metadataFileLength))
+        }
+      }
+      if (metadataStatuses.length != fsStatuses.length) {
+        logError(" FS and metadata files count not matching for " + partition 
+ ". FS files count " + fsStatuses.length + ", metadata base files count " + 
metadataStatuses.length)
+      }
+      for (entry <- fileStatusMap.entrySet) {
+        if (!metadataFileStatusMap.containsKey(entry.getKey)) {
+          logError("FS file not found in metadata " + entry.getKey)
+        } else if (entry.getValue.getLen != 
metadataFileStatusMap.get(entry.getKey).getLen) {
+          logError(" FS file size mismatch " + entry.getKey + ", size equality 
" + (entry.getValue.getLen == metadataFileStatusMap.get(entry.getKey).getLen) + 
". FS size " + entry.getValue.getLen + ", metadata size " + 
metadataFileStatusMap.get(entry.getKey).getLen)
+        }
+      }
+      for (entry <- metadataFileStatusMap.entrySet) {
+        if (!fileStatusMap.containsKey(entry.getKey)) {
+          logError("Metadata file not found in FS " + entry.getKey)
+        } else if (entry.getValue.getLen != 
fileStatusMap.get(entry.getKey).getLen) {
+          logError(" Metadata file size mismatch " + entry.getKey + ", size 
equality " + (entry.getValue.getLen == fileStatusMap.get(entry.getKey).getLen) 
+ ". Metadata size " + entry.getValue.getLen + ", FS size " + 
metadataFileStatusMap.get(entry.getKey).getLen)
+        }
+      }
+    }
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new ValidateMetadataFilesProcedure()
+}
+
+object ValidateMetadataFilesProcedure {
+  val NAME = "validate_metadata_files"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ValidateMetadataFilesProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
new file mode 100644
index 0000000000..9dbb8f22ec
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestMetadataProcedure extends HoodieSparkSqlTestBase {
+
+  test("Test Call metadata_delete Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // delete the metadata
+      val deleteResult = spark.sql(s"""call metadata_delete(table => 
'$tableName')""").collect()
+      assertResult(1) {
+        deleteResult.length
+      }
+    }
+  }
+
+  test("Test Call metadata_create Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // The first step is delete the metadata
+      val deleteResult = spark.sql(s"""call metadata_delete(table => 
'$tableName')""").collect()
+      assertResult(1) {
+        deleteResult.length
+      }
+
+      // The second step is create the metadata
+      val createResult = spark.sql(s"""call metadata_create(table => 
'$tableName')""").collect()
+      assertResult(1) {
+        createResult.length
+      }
+    }
+  }
+
+  test("Test Call metadata_init Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // read only, no initialize
+      val readResult = spark.sql(s"""call metadata_init(table => '$tableName', 
readOnly => true)""").collect()
+      assertResult(1) {
+        readResult.length
+      }
+
+      // initialize metadata
+      val initResult = spark.sql(s"""call metadata_init(table => 
'$tableName')""").collect()
+      assertResult(1) {
+        initResult.length
+      }
+    }
+  }
+
+  test("Test Call show_metadata_stats Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts',
+           |  hoodie.metadata.metrics.enable = 'true'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // collect metadata stats for table
+      val metadataStats = spark.sql(s"""call show_metadata_stats(table => 
'$tableName')""").collect()
+      assertResult(0) {
+        metadataStats.length
+      }
+    }
+  }
+
+  test("Test Call list_metadata_partitions Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | partitioned by (ts)
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // collect metadata partitions for table
+      val partitions = spark.sql(s"""call list_metadata_partitions(table => 
'$tableName')""").collect()
+      assertResult(2) {
+        partitions.length
+      }
+    }
+  }
+
+  test("Test Call list_metadata_files Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | partitioned by (ts)
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // collect metadata partitions for table
+      val partitions = spark.sql(s"""call list_metadata_partitions(table => 
'$tableName')""").collect()
+      assertResult(2) {
+        partitions.length
+      }
+
+      // collect metadata files for a partition of a table
+      val partition = partitions(0).get(0).toString
+      val filesResult = spark.sql(s"""call list_metadata_files(table => 
'$tableName', partition => '$partition')""").collect()
+      assertResult(1) {
+        filesResult.length
+      }
+    }
+  }
+
+  test("Test Call validate_metadata_files Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | partitioned by (ts)
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // collect validate metadata files result
+      val validateFilesResult = spark.sql(s"""call 
validate_metadata_files(table => '$tableName')""").collect()
+      assertResult(0) {
+        validateFilesResult.length
+      }
+
+      // collect validate metadata files result with verbose
+      val validateFilesVerboseResult = spark.sql(s"""call 
validate_metadata_files(table => '$tableName', verbose => true)""").collect()
+      assertResult(2) {
+        validateFilesVerboseResult.length
+      }
+    }
+  }
+}

Reply via email to