This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a83916c4347ad9415e43e156fb4f0b3aba748959 Author: KnightChess <[email protected]> AuthorDate: Wed Nov 30 09:02:07 2022 +0800 [HUDI-5278] Support more conf to cluster procedure (#7304) --- .../apache/hudi/config/HoodieClusteringConfig.java | 52 ++++ .../procedures/RunClusteringProcedure.scala | 91 ++++++- .../hudi/procedure/TestClusteringProcedure.scala | 269 ++++++++++++++++++++- 3 files changed, 401 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 1180845a6ed..8db88178f8d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -680,5 +680,57 @@ public class HoodieClusteringConfig extends HoodieConfig { return enumValue; } + + public String getValue() { + return value; + } + } + + public enum ClusteringOperator { + + /** + * only schedule the clustering plan + */ + SCHEDULE("schedule"), + + /** + * only execute then pending clustering plans + */ + EXECUTE("execute"), + + /** + * schedule cluster first, and execute all pending clustering plans + */ + SCHEDULE_AND_EXECUTE("scheduleandexecute"); + + private static final Map<String, ClusteringOperator> VALUE_TO_ENUM_MAP = + TypeUtils.getValueToEnumMap(ClusteringOperator.class, e -> e.value); + + private final String value; + + ClusteringOperator(String value) { + this.value = value; + } + + @Nonnull + public static ClusteringOperator fromValue(String value) { + ClusteringOperator enumValue = VALUE_TO_ENUM_MAP.get(value); + if (enumValue == null) { + throw new HoodieException(String.format("Invalid value (%s)", value)); + } + return enumValue; + } + + public boolean isSchedule() { + return this != ClusteringOperator.EXECUTE; + } + + public boolean isExecute() { + return this != ClusteringOperator.SCHEDULE; + } + + public String getValue() { + return value; + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index fa5bbb33bbf..d34c0b0d7b7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.client.SparkRDDWriteClient -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ValidationUtils.checkArgument -import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} +import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieClusteringConfig +import org.apache.hudi.config.HoodieClusteringConfig.{ClusteringOperator, LayoutOptimizationStrategy} import org.apache.hudi.exception.HoodieClusteringException import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex} import org.apache.spark.internal.Logging @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.execution.datasources.FileStatusCache import org.apache.spark.sql.types._ +import java.util.Locale import java.util.function.Supplier import scala.collection.JavaConverters._ @@ -50,7 +52,12 @@ class RunClusteringProcedure extends BaseProcedure ProcedureParameter.optional(1, "path", DataTypes.StringType, None), ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None), ProcedureParameter.optional(3, "order", DataTypes.StringType, None), - ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false) + ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false), + ProcedureParameter.optional(5, "op", DataTypes.StringType, None), + ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, None), + // params => key=value, key2=value2 + ProcedureParameter.optional(7, "options", DataTypes.StringType, None), + ProcedureParameter.optional(8, "instants", DataTypes.StringType, None) ) private val OUTPUT_TYPE = new StructType(Array[StructField]( @@ -72,6 +79,10 @@ class RunClusteringProcedure extends BaseProcedure val predicate = getArgValueOrDefault(args, PARAMETERS(2)) val orderColumns = getArgValueOrDefault(args, PARAMETERS(3)) val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean] + val op = getArgValueOrDefault(args, PARAMETERS(5)) + val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6)) + val options = getArgValueOrDefault(args, PARAMETERS(7)) + val instantsStr = getArgValueOrDefault(args, PARAMETERS(8)) val basePath: String = getBasePath(tableName, tablePath) val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build @@ -100,24 +111,74 @@ class RunClusteringProcedure extends BaseProcedure logInfo("No order columns") } + orderStrategy match { + case Some(o) => + val strategy = LayoutOptimizationStrategy.fromValue(o.asInstanceOf[String]) + conf = conf ++ Map( + HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key() -> strategy.getValue + ) + case _ => + logInfo("No order strategy") + } + + options match { + case Some(p) => + val paramPairs = StringUtils.split(p.asInstanceOf[String], ",").asScala + paramPairs.foreach{ pair => + val values = StringUtils.split(pair, "=") + conf = conf ++ Map(values.get(0) -> values.get(1)) + } + case _ => + logInfo("No options") + } + // Get all pending clustering instants var pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient) .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f) + + var operator: ClusteringOperator = ClusteringOperator.SCHEDULE_AND_EXECUTE + pendingClustering = instantsStr match { + case Some(inst) => + op match { + case Some(o) => + if (!ClusteringOperator.EXECUTE.name().equalsIgnoreCase(o.asInstanceOf[String])) { + throw new HoodieClusteringException("specific instants only can be used in 'execute' op or not specific op") + } + case _ => + logInfo("No op and set it to EXECUTE with instants specified.") + } + operator = ClusteringOperator.EXECUTE + checkAndFilterPendingInstants(pendingClustering, inst.asInstanceOf[String]) + case _ => + logInfo("No specific instants") + op match { + case Some(o) => + operator = ClusteringOperator.fromValue(o.asInstanceOf[String].toLowerCase(Locale.ROOT)) + case _ => + logInfo("No op, use default scheduleAndExecute") + } + pendingClustering + } + logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}") var client: SparkRDDWriteClient[_] = null try { client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf) - val instantTime = HoodieActiveTimeline.createNewInstantTime - if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) { - pendingClustering ++= Seq(instantTime) + if (operator.isSchedule) { + val instantTime = HoodieActiveTimeline.createNewInstantTime + if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) { + pendingClustering ++= Seq(instantTime) + } } logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.") - val startTs = System.currentTimeMillis() - pendingClustering.foreach(client.cluster(_, true)) - logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," + - s" time cost: ${System.currentTimeMillis() - startTs}ms.") + if (operator.isExecute) { + val startTs = System.currentTimeMillis() + pendingClustering.foreach(client.cluster(_, true)) + logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," + + s" time cost: ${System.currentTimeMillis() - startTs}ms.") + } val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp)) @@ -182,6 +243,16 @@ class RunClusteringProcedure extends BaseProcedure }) } + private def checkAndFilterPendingInstants(pendingInstants: Seq[String], instantStr: String): Seq[String] = { + val instants = StringUtils.split(instantStr, ",").asScala + val pendingSet = pendingInstants.toSet + val noneInstants = instants.filter(ins => !pendingSet.contains(ins)) + if (noneInstants.nonEmpty) { + throw new HoodieClusteringException(s"specific ${noneInstants.mkString(",")} instants is not exist") + } + instants.sortBy(f => f) + } + } object RunClusteringProcedure { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala index 456f9c5066a..cc61db4a03d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala @@ -19,11 +19,21 @@ package org.apache.spark.sql.hudi.procedure +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline} import org.apache.hudi.common.util.{Option => HOption} -import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers} +import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils, HoodieDataSourceHelpers, HoodieFileIndex} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} +import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, StructField, StructType} +import org.apache.spark.sql.{Dataset, Row} +import java.util import scala.collection.JavaConverters.asScalaIteratorConverter class TestClusteringProcedure extends HoodieSparkProcedureTestBase { @@ -385,4 +395,261 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase { } } } + + test("Test Call run_clustering Procedure with specific instants") { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + + spark.sql( + s""" + |create table $tableName ( + | c1 int, + | c2 string, + | c3 double + |) using hudi + | options ( + | primaryKey = 'c1', + | type = 'cow', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.index.column.stats.enable = 'true', + | hoodie.enable.data.skipping = 'true', + | hoodie.datasource.write.operation = 'insert' + | ) + | location '$basePath' + """.stripMargin) + + writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false")) + spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')") + + writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false")) + spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')") + + val conf = new Configuration + val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build + val instants = metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.iterator().asScala.map(_.getTimestamp).toSeq + assert(2 == instants.size) + + checkExceptionContain( + s"call run_clustering(table => '$tableName', instants => '000000, ${instants.head}')" + )("specific 000000 instants is not exist") + metaClient.reloadActiveTimeline() + assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size()) + assert(2 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.size()) + + writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate" -> "false")) + // specific instants will not schedule new cluster plan + spark.sql(s"call run_clustering(table => '$tableName', instants => '${instants.mkString(",")}')") + metaClient.reloadActiveTimeline() + assert(2 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size()) + assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.size()) + + // test with operator schedule + checkExceptionContain( + s"call run_clustering(table => '$tableName', instants => '000000', op => 'schedule')" + )("specific instants only can be used in 'execute' op or not specific op") + + // test with operator scheduleAndExecute + checkExceptionContain( + s"call run_clustering(table => '$tableName', instants => '000000', op => 'scheduleAndExecute')" + )("specific instants only can be used in 'execute' op or not specific op") + + // test with operator execute + spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')") + metaClient.reloadActiveTimeline() + val instants2 = metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.iterator().asScala.map(_.getTimestamp).toSeq + spark.sql(s"call run_clustering(table => '$tableName', instants => '${instants2.mkString(",")}', op => 'execute')") + metaClient.reloadActiveTimeline() + assert(3 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size()) + assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline.getInstants.size()) + } + } + + test("Test Call run_clustering Procedure op") { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + + spark.sql( + s""" + |create table $tableName ( + | c1 int, + | c2 string, + | c3 double + |) using hudi + | options ( + | primaryKey = 'c1', + | type = 'cow', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.index.column.stats.enable = 'true', + | hoodie.enable.data.skipping = 'true', + | hoodie.datasource.write.operation = 'insert' + | ) + | location '$basePath' + """.stripMargin) + + writeRecords(2, 4, 0, basePath, Map("hoodie.avro.schema.validate"-> "false")) + val conf = new Configuration + val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build + assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size()) + assert(metaClient.getActiveTimeline.filterPendingReplaceTimeline().empty()) + + spark.sql(s"call run_clustering(table => '$tableName', op => 'schedule')") + metaClient.reloadActiveTimeline() + assert(0 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size()) + assert(1 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size()) + + spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')") + metaClient.reloadActiveTimeline() + assert(1 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size()) + assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size()) + + spark.sql(s"call run_clustering(table => '$tableName')") + metaClient.reloadActiveTimeline() + assert(2 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size()) + assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size()) + + spark.sql(s"call run_clustering(table => '$tableName')") + metaClient.reloadActiveTimeline() + assert(3 == metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.size()) + assert(0 == metaClient.getActiveTimeline.filterPendingReplaceTimeline().getInstants.size()) + + checkExceptionContain(s"call run_clustering(table => '$tableName', op => 'null')")("Invalid value") + } + } + + test("Test Call run_clustering Procedure Order Strategy") { + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" , + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key() -> "true" + ) + + val queryOpts = metadataOpts ++ Map( + "path" -> basePath, + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL + ) + + val dataFilterC2 = EqualTo(AttributeReference("c2", StringType, nullable = false)(), Literal("foo23")) + val dataFilterC3 = EqualTo(AttributeReference("c3", StringType, nullable = false)(), Literal("bar23")) + + spark.sql( + s""" + |create table $tableName ( + | c1 int, + | c2 string, + | c3 double + |) using hudi + | options ( + | primaryKey = 'c1', + | type = 'cow', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.index.column.stats.enable = 'true', + | hoodie.enable.data.skipping = 'true', + | hoodie.datasource.write.operation = 'insert' + | ) + | location '$basePath' + """.stripMargin) + + val fileNum = 20 + val numRecords = 400000 + + // insert records + writeRecords(fileNum, numRecords, 0, basePath, metadataOpts ++ Map("hoodie.avro.schema.validate"-> "false")) + val conf = new Configuration + val metaClient = HoodieTableMetaClient.builder.setConf(conf).setBasePath(basePath).build + val avgSize = avgRecord(metaClient.getActiveTimeline) + val avgCount = Math.ceil(1.0 * numRecords / fileNum).toLong + + spark.sql( + s"""call run_clustering(table => '$tableName', order => 'c2,c3', order_strategy => 'linear', options => " + | hoodie.copyonwrite.record.size.estimate=$avgSize, + | hoodie.parquet.max.file.size=${avgSize * avgCount}, + | hoodie.parquet.small.file.limit=0, + | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount}, + | hoodie.metadata.enable=true, + | hoodie.metadata.index.column.stats.enable=true + |")""".stripMargin) + + metaClient.reloadActiveTimeline() + val fileIndex1 = HoodieFileIndex(spark, metaClient, None, queryOpts) + val orderAllFiles = fileIndex1.allFiles.size + val c2OrderFilterCount = fileIndex1.listFiles(Seq(), Seq(dataFilterC2)).head.files.size + val c3OrderFilterCount = fileIndex1.listFiles(Seq(), Seq(dataFilterC3)).head.files.size + + spark.sql( + s"""call run_clustering(table => '$tableName', order => 'c2,c3', order_strategy => 'z-order', options => " + | hoodie.copyonwrite.record.size.estimate=$avgSize, + | hoodie.parquet.max.file.size=${avgSize * avgCount}, + | hoodie.parquet.small.file.limit=0, + | hoodie.clustering.plan.strategy.target.file.max.bytes=${avgSize * avgCount}, + | hoodie.metadata.enable=true, + | hoodie.metadata.index.column.stats.enable=true + |")""".stripMargin) + + metaClient.reloadActiveTimeline() + val fileIndex2 = HoodieFileIndex(spark, metaClient, None, queryOpts) + val ZOrderAllFiles = fileIndex2.allFiles.size + val c2ZOrderFilterCount = fileIndex2.listFiles(Seq(), Seq(dataFilterC2)).head.files.size + val c3ZOrderFilterCount = fileIndex2.listFiles(Seq(), Seq(dataFilterC3)).head.files.size + + assert((1.0 * c2OrderFilterCount / orderAllFiles) < (1.0 * c2ZOrderFilterCount / ZOrderAllFiles)) + assert((1.0 * c3OrderFilterCount / orderAllFiles) > (1.0 * c3ZOrderFilterCount / ZOrderAllFiles)) + } + } + + def avgRecord(commitTimeline: HoodieTimeline): Long = { + var totalByteSize = 0L + var totalRecordsCount = 0L + commitTimeline.getReverseOrderedInstants.toArray.foreach(instant => { + val commitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(instant.asInstanceOf[HoodieInstant]).get, classOf[HoodieCommitMetadata]) + totalByteSize = totalByteSize + commitMetadata.fetchTotalBytesWritten() + totalRecordsCount = totalRecordsCount + commitMetadata.fetchTotalRecordsWritten() + }) + + Math.ceil((1.0 * totalByteSize) / totalRecordsCount).toLong + } + + def writeRecords(files: Int, numRecords: Int, partitions: Int, location: String, options: Map[String, String]): Unit = { + val records = new util.ArrayList[Row](numRecords) + val rowDimension = Math.ceil(Math.sqrt(numRecords)).toInt + + val data = Stream.range(0, rowDimension, 1) + .flatMap(x => Stream.range(0, rowDimension, 1).map(y => Pair.of(x, y))) + + if (partitions > 0) { + data.foreach { i => + records.add(Row(i.getLeft % partitions, "foo" + i.getLeft, "bar" + i.getRight)) + } + } else { + data.foreach { i => + records.add(Row(i.getLeft, "foo" + i.getLeft, "bar" + i.getRight)) + } + } + + val struct = StructType(Array[StructField]( + StructField("c1", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("c2", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("c3", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + // files can not effect for hudi + val df = spark.createDataFrame(records, struct).repartition(files) + writeDF(df, location, options) + } + + def writeDF(df: Dataset[Row], location: String, options: Map[String, String]): Unit = { + df.select("c1", "c2", "c3") + .sortWithinPartitions("c1", "c2") + .write + .format("hudi") + .option(OPERATION.key(), WriteOperationType.INSERT.value()) + .option(RECORDKEY_FIELD.key(), "c1") + .options(options) + .mode("append").save(location) + } }
