This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1abdb5787748aa2dde56f7c9d8d06f91ae2b5119 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Tue Sep 27 12:02:35 2022 -0700 [HUDI-4848] Fixing repair deprecated partition tool (#6731) --- .../org/apache/hudi/cli/commands/SparkMain.java | 23 ++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index e43a5d037e..6649eaf766 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -59,6 +59,7 @@ import org.apache.hudi.utilities.HoodieCompactor; import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,6 +68,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.StructType; import java.io.IOException; import java.util.ArrayList; @@ -456,8 +458,15 @@ public class SparkMain { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build(); Map<String, String> propsMap = getPropsForRewrite(metaClient); rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap); - // after re-writing, we can safely delete older data. + // after re-writing, we can safely delete older partition. deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap); + // also, we can physically delete the old partition. + FileSystem fs = FSUtils.getFs(new Path(basePath), metaClient.getHadoopConf()); + try { + fs.delete(new Path(basePath, oldPartition), true); + } catch (IOException e) { + LOG.warn("Failed to delete older partition " + basePath); + } } return 0; } @@ -473,10 +482,14 @@ public class SparkMain { } private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset<Row> recordsToRewrite, HoodieTableMetaClient metaClient, Map<String, String> propsMap) { - recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(newPartition)) + String partitionFieldProp = metaClient.getTableConfig().getPartitionFieldProp(); + StructType structType = recordsToRewrite.schema(); + int partitionIndex = structType.fieldIndex(partitionFieldProp); + + recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(null).cast(structType.apply(partitionIndex).dataType())) .write() .options(propsMap) - .option("hoodie.datasource.write.operation", "insert") + .option("hoodie.datasource.write.operation", WriteOperationType.BULK_INSERT.value()) .format("hudi") .mode("Append") .save(basePath); @@ -484,10 +497,8 @@ public class SparkMain { private static Dataset<Row> getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) { return sqlContext.read() - .option("hoodie.datasource.read.extract.partition.values.from.path", "false") .format("hudi") - .load(basePath) - .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + oldPartition + "'") + .load(basePath + "/" + oldPartition) .drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD) .drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)
