This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1ccf37f2287484c83f078a6d2dbb7e61f263640a Author: StreamingFlames <[email protected]> AuthorDate: Wed Feb 1 13:53:19 2023 +0800 [HUDI-5540] Close write client after usage of DeleteMarker/RollbackToInstantTime/RunClean/RunCompactionProcedure (#7655) --- .../command/procedures/DeleteMarkerProcedure.scala | 8 +- .../RollbackToInstantTimeProcedure.scala | 50 ++++---- .../command/procedures/RunCleanProcedure.scala | 27 +++-- .../procedures/RunCompactionProcedure.scala | 126 +++++++++++---------- 4 files changed, 121 insertions(+), 90 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala index bfbab32599b..d99a5489799 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi.command.procedures +import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.table.HoodieSparkTable import org.apache.hudi.table.marker.WriteMarkersFactory import org.apache.spark.internal.Logging @@ -47,8 +48,9 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] val basePath = getBasePath(tableName) + var client: SparkRDDWriteClient[_] = null val result = Try { - val client = createHoodieClient(jsc, basePath) + client = createHoodieClient(jsc, basePath) val config = client.getConfig val context = client.getEngineContext val table = HoodieSparkTable.create(config, context) @@ -63,6 +65,10 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log false } + if (client != null) { + client.close() + } + Seq(Row(result)) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala index 1fcc665d611..c8109bd56e2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion @@ -52,28 +53,35 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table) val basePath = hoodieCatalogTable.tableLocation - val client = createHoodieClient(jsc, basePath) - client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false") - val config = getWriteConfig(basePath) - val metaClient = HoodieTableMetaClient.builder - .setConf(jsc.hadoopConfiguration) - .setBasePath(config.getBasePath) - .setLoadActiveTimelineOnLoad(false) - .setConsistencyGuardConfig(config.getConsistencyGuardConfig) - .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion))) - .build - - val activeTimeline = metaClient.getActiveTimeline - val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants - val filteredTimeline = completedTimeline.containsInstant(instantTime) - if (!filteredTimeline) { - throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline") + var client: SparkRDDWriteClient[_] = null + try { + client = createHoodieClient(jsc, basePath) + client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false") + val config = getWriteConfig(basePath) + val metaClient = HoodieTableMetaClient.builder + .setConf(jsc.hadoopConfiguration) + .setBasePath(config.getBasePath) + .setLoadActiveTimelineOnLoad(false) + .setConsistencyGuardConfig(config.getConsistencyGuardConfig) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion))) + .build + + val activeTimeline = metaClient.getActiveTimeline + val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants + val filteredTimeline = completedTimeline.containsInstant(instantTime) + if (!filteredTimeline) { + throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline") + } + + val result = if (client.rollback(instantTime)) true else false + val outputRow = Row(result) + + Seq(outputRow) + } finally { + if (client != null) { + client.close() + } } - - val result = if (client.rollback(instantTime)) true else false - val outputRow = Row(result) - - Seq(outputRow) } override def build: Procedure = new RollbackToInstantTimeProcedure() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala index 36580176d0f..ca8b3fc95bc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.config.HoodieCleanConfig @@ -79,16 +80,24 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> getArgValueOrDefault(args, PARAMETERS(7)).get.toString, HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, PARAMETERS(8)).get.toString ) - val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props) - val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking) - if (hoodieCleanMeta == null) Seq.empty - else Seq(Row(hoodieCleanMeta.getStartCleanTime, - hoodieCleanMeta.getTimeTakenInMillis, - hoodieCleanMeta.getTotalFilesDeleted, - hoodieCleanMeta.getEarliestCommitToRetain, - JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata), - hoodieCleanMeta.getVersion)) + var client: SparkRDDWriteClient[_] = null + try { + client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props) + val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking) + + if (hoodieCleanMeta == null) Seq.empty + else Seq(Row(hoodieCleanMeta.getStartCleanTime, + hoodieCleanMeta.getTimeTakenInMillis, + hoodieCleanMeta.getTotalFilesDeleted, + hoodieCleanMeta.getEarliestCommitToRetain, + JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata), + hoodieCleanMeta.getVersion)) + } finally { + if (client != null) { + client.close() + } + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala index 3c51d7d8b29..d79cf8c302f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi.command.procedures +import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.model.HoodieCommitMetadata import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} @@ -64,70 +65,77 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp val basePath = getBasePath(tableName, tablePath) val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build - val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty) - - var willCompactionInstants: Seq[String] = Seq.empty - operation match { - case "schedule" => - val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime) - if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { - willCompactionInstants = Seq(instantTime) - } - case "run" => - // Do compaction - val timeLine = metaClient.getActiveTimeline - val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala - .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) - .map(_.getTimestamp) - .toSeq.sortBy(f => f) - willCompactionInstants = if (instantTimestamp.isEmpty) { - if (pendingCompactionInstants.nonEmpty) { - pendingCompactionInstants - } else { // If there are no pending compaction, schedule to generate one. - // CompactionHoodiePathCommand will return instanceTime for SCHEDULE. - val instantTime = HoodieActiveTimeline.createNewInstantTime() - if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { - Seq(instantTime) + + var client: SparkRDDWriteClient[_] = null + try { + client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty) + var willCompactionInstants: Seq[String] = Seq.empty + operation match { + case "schedule" => + val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime) + if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { + willCompactionInstants = Seq(instantTime) + } + case "run" => + // Do compaction + val timeLine = metaClient.getActiveTimeline + val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala + .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) + .map(_.getTimestamp) + .toSeq.sortBy(f => f) + willCompactionInstants = if (instantTimestamp.isEmpty) { + if (pendingCompactionInstants.nonEmpty) { + pendingCompactionInstants + } else { // If there are no pending compaction, schedule to generate one. + // CompactionHoodiePathCommand will return instanceTime for SCHEDULE. + val instantTime = HoodieActiveTimeline.createNewInstantTime() + if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { + Seq(instantTime) + } else { + Seq.empty + } + } + } else { + // Check if the compaction timestamp has exists in the pending compaction + if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) { + Seq(instantTimestamp.get.toString) } else { - Seq.empty + throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " + + s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ") } } - } else { - // Check if the compaction timestamp has exists in the pending compaction - if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) { - Seq(instantTimestamp.get.toString) + + if (willCompactionInstants.isEmpty) { + logInfo(s"No need to compaction on $basePath") } else { - throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " + - s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ") - } - } - - if (willCompactionInstants.isEmpty) { - logInfo(s"No need to compaction on $basePath") - } else { - logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath") - val timer = HoodieTimer.start - willCompactionInstants.foreach { compactionInstant => - val writeResponse = client.compact(compactionInstant) - handleResponse(writeResponse.getCommitMetadata.get()) - client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty()) + logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath") + val timer = HoodieTimer.start + willCompactionInstants.foreach { compactionInstant => + val writeResponse = client.compact(compactionInstant) + handleResponse(writeResponse.getCommitMetadata.get()) + client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty()) + } + logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," + + s" spend: ${timer.endTimer()}ms") } - logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," + - s" spend: ${timer.endTimer()}ms") - } - case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation") - } - - val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala - .filter(instant => willCompactionInstants.contains(instant.getTimestamp)) - .toSeq - .sortBy(p => p.getTimestamp) - .reverse - - compactionInstants.map(instant => - (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)) - ).map { case (instant, plan) => - Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name()) + case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation") + } + + val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala + .filter(instant => willCompactionInstants.contains(instant.getTimestamp)) + .toSeq + .sortBy(p => p.getTimestamp) + .reverse + + compactionInstants.map(instant => + (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)) + ).map { case (instant, plan) => + Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name()) + } + } finally { + if (client != null) { + client.close() + } } }
