This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e0954040a9 [HUDI-3511] Add call procedure for MetadataCommand (#6018)
e0954040a9 is described below
commit e0954040a9ce490843a56d3497715bbc9eb7ffc0
Author: superche <[email protected]>
AuthorDate: Sun Jul 3 21:44:56 2022 +0800
[HUDI-3511] Add call procedure for MetadataCommand (#6018)
---
.../hudi/command/procedures/HoodieProcedures.scala | 7 +
.../procedures/ListMetadataFilesProcedure.scala | 83 +++++++
.../ListMetadataPartitionsProcedure.scala | 81 +++++++
.../procedures/MetadataCreateProcedure.scala | 80 +++++++
.../procedures/MetadataDeleteProcedure.scala | 71 ++++++
.../command/procedures/MetadataInitProcedure.scala | 84 +++++++
.../procedures/ShowMetadataStatsProcedure.scala | 75 ++++++
.../ValidateMetadataFilesProcedure.scala | 147 ++++++++++++
.../sql/hudi/procedure/TestMetadataProcedure.scala | 262 +++++++++++++++++++++
9 files changed, 890 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index b8893ac80d..0620e287d0 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -66,6 +66,13 @@ object HoodieProcedures {
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME,
ShowBootstrapPartitionsProcedure.builder)
mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
mapBuilder.put(DowngradeTableProcedure.NAME,
DowngradeTableProcedure.builder)
+ mapBuilder.put(ListMetadataFilesProcedure.NAME,
ListMetadataFilesProcedure.builder)
+ mapBuilder.put(ListMetadataPartitionsProcedure.NAME,
ListMetadataPartitionsProcedure.builder)
+ mapBuilder.put(MetadataCreateProcedure.NAME,
MetadataCreateProcedure.builder)
+ mapBuilder.put(MetadataDeleteProcedure.NAME,
MetadataDeleteProcedure.builder)
+ mapBuilder.put(MetadataInitProcedure.NAME, MetadataInitProcedure.builder)
+ mapBuilder.put(ShowMetadataStatsProcedure.NAME,
ShowMetadataStatsProcedure.builder)
+ mapBuilder.put(ValidateMetadataFilesProcedure.NAME,
ValidateMetadataFilesProcedure.builder)
mapBuilder.build
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala
new file mode 100644
index 0000000000..591293e08d
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util
+import java.util.function.Supplier
+
+class ListMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder
with Logging {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "partition", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("file_path", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val table = getArgValueOrDefault(args, PARAMETERS(0))
+ val partition = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[String]
+
+ val basePath = getBasePath(table)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val config = HoodieMetadataConfig.newBuilder.enable(true).build
+ val metaReader = new HoodieBackedTableMetadata(new
HoodieLocalEngineContext(metaClient.getHadoopConf),
+ config, basePath, "/tmp")
+ if (!metaReader.enabled){
+ throw new HoodieException(s"Metadata Table not enabled/initialized.")
+ }
+
+ val timer = new HoodieTimer().startTimer
+ val statuses = metaReader.getAllFilesInPartition(new Path(basePath,
partition))
+ logDebug("Took " + timer.endTimer + " ms")
+
+ val rows = new util.ArrayList[Row]
+ statuses.toStream.sortBy(p => p.getPath.getName).foreach((f: FileStatus)
=> {
+ rows.add(Row(f.getPath.getName))
+ })
+ rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+ }
+
+ override def build: Procedure = new ListMetadataFilesProcedure()
+}
+
+object ListMetadataFilesProcedure {
+ val NAME = "list_metadata_files"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ListMetadataFilesProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala
new file mode 100644
index 0000000000..4c0bf15d90
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util
+import java.util.Collections
+import java.util.function.Supplier
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class ListMetadataPartitionsProcedure() extends BaseProcedure with
ProcedureBuilder with Logging {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("partition", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val table = getArgValueOrDefault(args, PARAMETERS(0))
+
+ val basePath = getBasePath(table)
+ val config = HoodieMetadataConfig.newBuilder.enable(true).build
+ val metadata = new HoodieBackedTableMetadata(new
HoodieSparkEngineContext(jsc),
+ config, basePath, "/tmp")
+ if (!metadata.enabled){
+ throw new HoodieException(s"Metadata Table not enabled/initialized.")
+ }
+
+ val timer = new HoodieTimer().startTimer
+ val partitions = metadata.getAllPartitionPaths
+ Collections.sort(partitions)
+ logDebug("Took " + timer.endTimer + " ms")
+
+ val rows = new util.ArrayList[Row]
+ partitions.stream.iterator().asScala.foreach((p: String) => {
+ rows.add(Row(p))
+ })
+ rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+ }
+
+ override def build: Procedure = new ListMetadataPartitionsProcedure()
+}
+
+object ListMetadataPartitionsProcedure {
+ val NAME = "list_metadata_partitions"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ListMetadataPartitionsProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala
new file mode 100644
index 0000000000..e20459ea36
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.metadata.{HoodieTableMetadata,
SparkHoodieBackedTableMetadataWriter}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+import java.io.FileNotFoundException
+import java.util.function.Supplier
+
+class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with
SparkAdapterSupport {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("result", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+
+ val basePath = getBasePath(tableName)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val metadataPath = new
Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
+
+ try {
+ val statuses = metaClient.getFs.listStatus(metadataPath)
+ if (statuses.nonEmpty) {
+ throw new RuntimeException("Metadata directory (" +
metadataPath.toString + ") not empty.")
+ }
+ } catch {
+ case e: FileNotFoundException =>
+ // Metadata directory does not exist yet
+ metaClient.getFs.mkdirs(metadataPath)
+ }
+ val timer = new HoodieTimer().startTimer
+ val writeConfig = getWriteConfig(basePath)
+ SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf,
writeConfig, new HoodieSparkEngineContext(jsc))
+ Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" +
timer.endTimer / 1000.0 + "secs)"))
+ }
+
+ override def build = new MetadataCreateProcedure()
+}
+
+object MetadataCreateProcedure {
+ val NAME = "metadata_create"
+ var metadataBaseDirectory: Option[String] = None
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new MetadataCreateProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala
new file mode 100644
index 0000000000..216a365117
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.HoodieTableMetadata
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+import java.io.FileNotFoundException
+import java.util.function.Supplier
+
+class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with
SparkAdapterSupport {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("result", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val basePath = getBasePath(tableName)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val metadataPath = new
Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
+
+ try {
+ val statuses = metaClient.getFs.listStatus(metadataPath)
+ if (statuses.nonEmpty) metaClient.getFs.delete(metadataPath, true)
+ } catch {
+ case e: FileNotFoundException =>
+ // Metadata directory does not exist
+ }
+ Seq(Row("Removed Metadata Table from " + metadataPath))
+ }
+
+ override def build = new MetadataDeleteProcedure()
+}
+
+object MetadataDeleteProcedure {
+ val NAME = "metadata_delete"
+ var metadataBaseDirectory: Option[String] = None
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new MetadataDeleteProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala
new file mode 100644
index 0000000000..acd1532a97
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.metadata.{HoodieTableMetadata,
SparkHoodieBackedTableMetadataWriter}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+import java.io.FileNotFoundException
+import java.util.function.Supplier
+
+class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with
SparkAdapterSupport with Logging {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "readOnly", DataTypes.BooleanType, false)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("result", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val readOnly = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[Boolean]
+
+ val basePath = getBasePath(tableName)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val metadataPath = new
Path(HoodieTableMetadata.getMetadataTableBasePath(basePath))
+ try {
+ metaClient.getFs.listStatus(metadataPath)
+ } catch {
+ case e: FileNotFoundException =>
+ // Metadata directory does not exist yet
+ throw new RuntimeException("Metadata directory (" +
metadataPath.toString + ") does not exist.")
+ }
+
+ val timer = new HoodieTimer().startTimer
+ if (!readOnly) {
+ val writeConfig = getWriteConfig(basePath)
+ SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf,
writeConfig, new HoodieSparkEngineContext(jsc))
+ }
+
+ val action = if (readOnly) "Opened" else "Initialized"
+ Seq(Row(action + " Metadata Table in " + metadataPath + " (duration=" +
timer.endTimer / 1000.0 + "sec)"))
+ }
+
+ override def build = new MetadataInitProcedure()
+}
+
+object MetadataInitProcedure {
+ val NAME = "metadata_init"
+ var metadataBaseDirectory: Option[String] = None
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new MetadataInitProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala
new file mode 100644
index 0000000000..9a73a51fd1
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+
+class ShowMetadataStatsProcedure() extends BaseProcedure with ProcedureBuilder
{
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("stat_key", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("stat_value", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val table = getArgValueOrDefault(args, PARAMETERS(0))
+
+ val basePath = getBasePath(table)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val config = HoodieMetadataConfig.newBuilder.enable(true).build
+ val metadata = new HoodieBackedTableMetadata(new
HoodieLocalEngineContext(metaClient.getHadoopConf),
+ config, basePath, "/tmp")
+ val stats = metadata.stats
+
+ val rows = new util.ArrayList[Row]
+ for (entry <- stats.entrySet) {
+ rows.add(Row(entry.getKey, entry.getValue))
+ }
+ rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+ }
+
+ override def build: Procedure = new ShowMetadataStatsProcedure()
+}
+
+object ShowMetadataStatsProcedure {
+ val NAME = "show_metadata_stats"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ShowMetadataStatsProcedure()
+ }
+}
+
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala
new file mode 100644
index 0000000000..b3c125942a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.HoodieTimer
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.metadata.HoodieBackedTableMetadata
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util
+import java.util.Collections
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class ValidateMetadataFilesProcedure() extends BaseProcedure with
ProcedureBuilder with Logging {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "verbose", DataTypes.BooleanType, false)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("partition", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("file_name", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("is_present_in_fs", DataTypes.BooleanType, nullable = true,
Metadata.empty),
+ StructField("is_resent_in_metadata", DataTypes.BooleanType, nullable =
true, Metadata.empty),
+ StructField("fs_size", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("metadata_size", DataTypes.LongType, nullable = true,
Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val table = getArgValueOrDefault(args, PARAMETERS(0))
+ val verbose = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[Boolean]
+
+ val basePath = getBasePath(table)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val config = HoodieMetadataConfig.newBuilder.enable(true).build
+ val metadataReader = new HoodieBackedTableMetadata(new
HoodieLocalEngineContext(metaClient.getHadoopConf),
+ config, basePath, "/tmp")
+
+ if (!metadataReader.enabled){
+ throw new HoodieException(s"Metadata Table not enabled/initialized.")
+ }
+
+ val fsConfig = HoodieMetadataConfig.newBuilder.enable(false).build
+ val fsMetaReader = new HoodieBackedTableMetadata(new
HoodieLocalEngineContext(metaClient.getHadoopConf),
+ fsConfig, basePath, "/tmp")
+
+ val timer = new HoodieTimer().startTimer
+ val metadataPartitions = metadataReader.getAllPartitionPaths
+ logDebug("Listing partitions Took " + timer.endTimer + " ms")
+ val fsPartitions = fsMetaReader.getAllPartitionPaths
+ Collections.sort(fsPartitions)
+ Collections.sort(metadataPartitions)
+
+ val allPartitions = new util.HashSet[String]
+ allPartitions.addAll(fsPartitions)
+ allPartitions.addAll(metadataPartitions)
+
+ if (!fsPartitions.equals(metadataPartitions)) {
+ logError("FS partition listing is not matching with metadata partition
listing!")
+ logError("All FS partitions: " +
util.Arrays.toString(fsPartitions.toArray))
+ logError("All Metadata partitions: " +
util.Arrays.toString(metadataPartitions.toArray))
+ }
+
+ val rows = new util.ArrayList[Row]
+ for (partition <- allPartitions) {
+ val fileStatusMap = new util.HashMap[String, FileStatus]
+ val metadataFileStatusMap = new util.HashMap[String, FileStatus]
+ val metadataStatuses = metadataReader.getAllFilesInPartition(new
Path(basePath, partition))
+ util.Arrays.stream(metadataStatuses).iterator().asScala.foreach((entry:
FileStatus) => metadataFileStatusMap.put(entry.getPath.getName, entry))
+ val fsStatuses = fsMetaReader.getAllFilesInPartition(new Path(basePath,
partition))
+ util.Arrays.stream(fsStatuses).iterator().asScala.foreach((entry:
FileStatus) => fileStatusMap.put(entry.getPath.getName, entry))
+ val allFiles = new util.HashSet[String]
+ allFiles.addAll(fileStatusMap.keySet)
+ allFiles.addAll(metadataFileStatusMap.keySet)
+ for (file <- allFiles) {
+ val fsFileStatus = fileStatusMap.get(file)
+ val metaFileStatus = metadataFileStatusMap.get(file)
+ val doesFsFileExists = fsFileStatus != null
+ val doesMetadataFileExists = metaFileStatus != null
+ val fsFileLength = if (doesFsFileExists) fsFileStatus.getLen else 0
+ val metadataFileLength = if (doesMetadataFileExists)
metaFileStatus.getLen else 0
+ if (verbose) { // if verbose print all files
+ rows.add(Row(partition, file, doesFsFileExists,
doesMetadataFileExists, fsFileLength, metadataFileLength))
+ } else if ((doesFsFileExists != doesMetadataFileExists) ||
(fsFileLength != metadataFileLength)) { // if non verbose, print only non
matching files
+ rows.add(Row(partition, file, doesFsFileExists,
doesMetadataFileExists, fsFileLength, metadataFileLength))
+ }
+ }
+ if (metadataStatuses.length != fsStatuses.length) {
+ logError(" FS and metadata files count not matching for " + partition
+ ". FS files count " + fsStatuses.length + ", metadata base files count " +
metadataStatuses.length)
+ }
+ for (entry <- fileStatusMap.entrySet) {
+ if (!metadataFileStatusMap.containsKey(entry.getKey)) {
+ logError("FS file not found in metadata " + entry.getKey)
+ } else if (entry.getValue.getLen !=
metadataFileStatusMap.get(entry.getKey).getLen) {
+ logError(" FS file size mismatch " + entry.getKey + ", size equality
" + (entry.getValue.getLen == metadataFileStatusMap.get(entry.getKey).getLen) +
". FS size " + entry.getValue.getLen + ", metadata size " +
metadataFileStatusMap.get(entry.getKey).getLen)
+ }
+ }
+ for (entry <- metadataFileStatusMap.entrySet) {
+ if (!fileStatusMap.containsKey(entry.getKey)) {
+ logError("Metadata file not found in FS " + entry.getKey)
+ } else if (entry.getValue.getLen !=
fileStatusMap.get(entry.getKey).getLen) {
+ logError(" Metadata file size mismatch " + entry.getKey + ", size
equality " + (entry.getValue.getLen == fileStatusMap.get(entry.getKey).getLen)
+ ". Metadata size " + entry.getValue.getLen + ", FS size " +
metadataFileStatusMap.get(entry.getKey).getLen)
+ }
+ }
+ }
+ rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+ }
+
+ override def build: Procedure = new ValidateMetadataFilesProcedure()
+}
+
+object ValidateMetadataFilesProcedure {
+ val NAME = "validate_metadata_files"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ValidateMetadataFilesProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
new file mode 100644
index 0000000000..9dbb8f22ec
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestMetadataProcedure extends HoodieSparkSqlTestBase {
+
+ test("Test Call metadata_delete Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // delete the metadata
+ val deleteResult = spark.sql(s"""call metadata_delete(table =>
'$tableName')""").collect()
+ assertResult(1) {
+ deleteResult.length
+ }
+ }
+ }
+
+ test("Test Call metadata_create Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // The first step is delete the metadata
+ val deleteResult = spark.sql(s"""call metadata_delete(table =>
'$tableName')""").collect()
+ assertResult(1) {
+ deleteResult.length
+ }
+
+ // The second step is create the metadata
+ val createResult = spark.sql(s"""call metadata_create(table =>
'$tableName')""").collect()
+ assertResult(1) {
+ createResult.length
+ }
+ }
+ }
+
+ test("Test Call metadata_init Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // read only, no initialize
+ val readResult = spark.sql(s"""call metadata_init(table => '$tableName',
readOnly => true)""").collect()
+ assertResult(1) {
+ readResult.length
+ }
+
+ // initialize metadata
+ val initResult = spark.sql(s"""call metadata_init(table =>
'$tableName')""").collect()
+ assertResult(1) {
+ initResult.length
+ }
+ }
+ }
+
+ test("Test Call show_metadata_stats Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.metadata.metrics.enable = 'true'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // collect metadata stats for table
+ val metadataStats = spark.sql(s"""call show_metadata_stats(table =>
'$tableName')""").collect()
+ assertResult(0) {
+ metadataStats.length
+ }
+ }
+ }
+
+ test("Test Call list_metadata_partitions Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | partitioned by (ts)
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // collect metadata partitions for table
+ val partitions = spark.sql(s"""call list_metadata_partitions(table =>
'$tableName')""").collect()
+ assertResult(2) {
+ partitions.length
+ }
+ }
+ }
+
+ test("Test Call list_metadata_files Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | partitioned by (ts)
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // collect metadata partitions for table
+ val partitions = spark.sql(s"""call list_metadata_partitions(table =>
'$tableName')""").collect()
+ assertResult(2) {
+ partitions.length
+ }
+
+ // collect metadata files for a partition of a table
+ val partition = partitions(0).get(0).toString
+ val filesResult = spark.sql(s"""call list_metadata_files(table =>
'$tableName', partition => '$partition')""").collect()
+ assertResult(1) {
+ filesResult.length
+ }
+ }
+ }
+
+ test("Test Call validate_metadata_files Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | partitioned by (ts)
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // collect validate metadata files result
+ val validateFilesResult = spark.sql(s"""call
validate_metadata_files(table => '$tableName')""").collect()
+ assertResult(0) {
+ validateFilesResult.length
+ }
+
+ // collect validate metadata files result with verbose
+ val validateFilesVerboseResult = spark.sql(s"""call
validate_metadata_files(table => '$tableName', verbose => true)""").collect()
+ assertResult(2) {
+ validateFilesVerboseResult.length
+ }
+ }
+ }
+}