This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f6f15c3c7 [HUDI-5049] Supports dropPartition for Flink catalog (#6991)
4f6f15c3c7 is described below

commit 4f6f15c3c761621eaaa1b3b52e0c2841626afe53
Author: Nicholas Jiang <[email protected]>
AuthorDate: Tue Oct 25 11:24:32 2022 +0800

    [HUDI-5049] Supports dropPartition for Flink catalog (#6991)
    
    * for both dfs and hms catalogs
---
 .../java/org/apache/hudi/table/HoodieTable.java    |   2 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   8 ++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   5 +-
 .../FlinkDeletePartitionCommitActionExecutor.java  | 101 +++++++++++++++++++++
 .../apache/hudi/table/catalog/HoodieCatalog.java   |  66 +++++++++++++-
 .../hudi/table/catalog/HoodieCatalogUtil.java      |  59 ++++++++++++
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  69 +++++++++++++-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  17 ++++
 .../hudi/table/catalog/TestHoodieCatalog.java      |  55 ++++++++++-
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  |  62 +++++++++++++
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |   7 +-
 11 files changed, 440 insertions(+), 11 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 4a6f4ae1f4..d94d229ff5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -210,7 +210,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
    * @param partitions   {@link List} of partition to be deleted
    * @return HoodieWriteMetadata
    */
-  public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext 
context, String instantTime, List<String> partitions);
+  public abstract HoodieWriteMetadata<O> deletePartitions(HoodieEngineContext 
context, String instantTime, List<String> partitions);
 
   /**
    * Upserts the given prepared records into the Hoodie table, at the supplied 
instantTime.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 191eb003b9..a00c361fde 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -254,6 +254,14 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     return postWrite(result, instantTime, table);
   }
 
+  public List<WriteStatus> deletePartitions(List<String> partitions, String 
instantTime) {
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
+        initTable(WriteOperationType.DELETE_PARTITION, 
Option.ofNullable(instantTime));
+    preWrite(instantTime, WriteOperationType.DELETE_PARTITION, 
table.getMetaClient());
+    HoodieWriteMetadata<List<WriteStatus>> result = 
table.deletePartitions(context, instantTime, partitions);
+    return postWrite(result, instantTime, table);
+  }
+
   @Override
   public void preWrite(String instantTime, WriteOperationType 
writeOperationType, HoodieTableMetaClient metaClient) {
     setOperationType(writeOperationType);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 7d2be6cb93..38e3918bd4 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -57,6 +57,7 @@ import org.apache.hudi.table.action.clean.CleanActionExecutor;
 import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
 import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
+import 
org.apache.hudi.table.action.commit.FlinkDeletePartitionCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
@@ -243,8 +244,8 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload>
   }
 
   @Override
-  public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, 
String instantTime, List<String> partitions) {
-    throw new HoodieNotSupportedException("DeletePartitions is not supported 
yet");
+  public HoodieWriteMetadata<List<WriteStatus>> 
deletePartitions(HoodieEngineContext context, String instantTime, List<String> 
partitions) {
+    return new FlinkDeletePartitionCommitActionExecutor<>(context, config, 
this, instantTime, partitions).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
new file mode 100644
index 0000000000..a301ba228e
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+public class FlinkDeletePartitionCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends FlinkInsertOverwriteCommitActionExecutor<T> {
+
+  private final List<String> partitions;
+
+  public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
+                                                  HoodieWriteConfig config,
+                                                  HoodieTable<?, ?, ?, ?> 
table,
+                                                  String instantTime,
+                                                  List<String> partitions) {
+    super(context, null, config, table, instantTime, null, 
WriteOperationType.DELETE_PARTITION);
+    this.partitions = partitions;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    try {
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      context.setJobStatus(this.getClass().getSimpleName(), "Gather all file 
ids from all deleting partitions.");
+      Map<String, List<String>> partitionToReplaceFileIds =
+          context.parallelize(partitions).distinct().collectAsList()
+              .stream().collect(Collectors.toMap(partitionPath -> 
partitionPath, this::getAllExistingFileIds));
+      HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
+      result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+      result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+      result.setWriteStatuses(Collections.emptyList());
+
+      // created requested
+      HoodieInstant dropPartitionsInstant = new HoodieInstant(REQUESTED, 
REPLACE_COMMIT_ACTION, instantTime);
+      if (!table.getMetaClient().getFs().exists(new 
Path(table.getMetaClient().getMetaPath(),
+          dropPartitionsInstant.getFileName()))) {
+        HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
+            .setOperationType(WriteOperationType.DELETE_PARTITION.name())
+            .setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
+            .build();
+        
table.getMetaClient().getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant,
+            
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+      }
+
+      this.saveWorkloadProfileMetadataToInflight(new 
WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())),
+          instantTime);
+      this.commitOnAutoCommit(result);
+      return result;
+    } catch (Exception e) {
+      throw new HoodieDeletePartitionException("Failed to drop partitions for 
commit time " + instantTime, e);
+    }
+  }
+
+  private List<String> getAllExistingFileIds(String partitionPath) {
+    // because new commit is not complete. it is safe to mark all existing 
file Ids as old files
+    return 
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 956d61cc3c..13f628347f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -18,15 +18,20 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -382,7 +387,16 @@ public class HoodieCatalog extends AbstractCatalog {
 
   @Override
   public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec 
catalogPartitionSpec) throws CatalogException {
-    return false;
+    if (!tableExists(tablePath)) {
+      return false;
+    }
+    String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+    Map<String, String> options = 
TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(),
 "false"));
+    return StreamerUtil.partitionExists(
+        inferTablePath(catalogPathStr, tablePath),
+        HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, 
catalogPartitionSpec),
+        hadoopConf);
   }
 
   @Override
@@ -394,7 +408,40 @@ public class HoodieCatalog extends AbstractCatalog {
   @Override
   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
catalogPartitionSpec, boolean ignoreIfNotExists)
       throws PartitionNotExistException, CatalogException {
-    throw new UnsupportedOperationException("dropPartition is not 
implemented.");
+    if (!tableExists(tablePath)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new PartitionNotExistException(getName(), tablePath, 
catalogPartitionSpec);
+      }
+    }
+
+    String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+    Map<String, String> options = 
TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(),
 "false"));
+    String partitionPathStr = 
HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, 
catalogPartitionSpec);
+
+    if (!StreamerUtil.partitionExists(tablePathStr, partitionPathStr, 
hadoopConf)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new PartitionNotExistException(getName(), tablePath, 
catalogPartitionSpec);
+      }
+    }
+
+    // enable auto-commit though ~
+    options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+    try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(options, 
tablePathStr, tablePath)) {
+      
writeClient.deletePartitions(Collections.singletonList(partitionPathStr), 
HoodieActiveTimeline.createNewInstantTime())
+          .forEach(writeStatus -> {
+            if (writeStatus.hasErrors()) {
+              throw new HoodieMetadataException(String.format("Failed to 
commit metadata table records at file id %s.", writeStatus.getFileId()));
+            }
+          });
+      fs.delete(new Path(tablePathStr, partitionPathStr), true);
+    } catch (Exception e) {
+      throw new CatalogException(String.format("Dropping partition %s of table 
%s exception.", partitionPathStr, tablePath), e);
+    }
   }
 
   @Override
@@ -505,7 +552,20 @@ public class HoodieCatalog extends AbstractCatalog {
     return newOptions;
   }
 
-  private String inferTablePath(String catalogPath, ObjectPath tablePath) {
+  private HoodieFlinkWriteClient<?> createWriteClient(
+      Map<String, String> options,
+      String tablePathStr,
+      ObjectPath tablePath) throws IOException {
+    return StreamerUtil.createWriteClient(
+        Configuration.fromMap(options)
+            .set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
+            .set(FlinkOptions.SOURCE_AVRO_SCHEMA,
+                StreamerUtil.createMetaClient(tablePathStr, hadoopConf)
+                    
.getTableConfig().getTableCreateSchema().get().toString()));
+  }
+
+  @VisibleForTesting
+  protected String inferTablePath(String catalogPath, ObjectPath tablePath) {
     return String.format("%s/%s/%s", catalogPath, tablePath.getDatabaseName(), 
tablePath.getObjectName());
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index 3dc191afb4..3b2d697a98 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -21,8 +21,12 @@ package org.apache.hudi.table.catalog;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,9 +39,11 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
@@ -113,4 +119,57 @@ public class HoodieCatalogUtil {
     }
     return Collections.emptyList();
   }
+
+  /**
+   * Returns the partition path with given {@link CatalogPartitionSpec}.
+   */
+  public static String inferPartitionPath(boolean hiveStylePartitioning, 
CatalogPartitionSpec catalogPartitionSpec) {
+    return catalogPartitionSpec.getPartitionSpec().entrySet()
+        .stream().map(entry ->
+            hiveStylePartitioning
+                ? String.format("%s=%s", entry.getKey(), entry.getValue())
+                : entry.getValue())
+        .collect(Collectors.joining("/"));
+  }
+
+  /**
+   * Returns a list of ordered partition values by re-arranging them based on 
the given list of
+   * partition keys. If the partition value is null, it'll be converted into 
default partition
+   * name.
+   *
+   * @param partitionSpec The partition spec
+   * @param partitionKeys The partition keys
+   * @param tablePath     The table path
+   * @return A list of partition values ordered by partition keys
+   * @throws PartitionSpecInvalidException thrown if partitionSpec and 
partitionKeys have
+   *     different sizes, or any key in partitionKeys doesn't exist in 
partitionSpec.
+   */
+  @VisibleForTesting
+  public static List<String> getOrderedPartitionValues(
+      String catalogName,
+      HiveConf hiveConf,
+      CatalogPartitionSpec partitionSpec,
+      List<String> partitionKeys,
+      ObjectPath tablePath)
+      throws PartitionSpecInvalidException {
+    Map<String, String> spec = partitionSpec.getPartitionSpec();
+    if (spec.size() != partitionKeys.size()) {
+      throw new PartitionSpecInvalidException(catalogName, partitionKeys, 
tablePath, partitionSpec);
+    }
+
+    List<String> values = new ArrayList<>(spec.size());
+    for (String key : partitionKeys) {
+      if (!spec.containsKey(key)) {
+        throw new PartitionSpecInvalidException(catalogName, partitionKeys, 
tablePath, partitionSpec);
+      } else {
+        String value = spec.get(key);
+        if (value == null) {
+          value = hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+        }
+        values.add(value);
+      }
+    }
+
+    return values;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 01b73b8605..c0cd386793 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -18,14 +18,18 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.sync.common.util.ConfigUtils;
 import org.apache.hudi.table.format.FilePathUtils;
@@ -74,6 +78,7 @@ import 
org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -488,7 +493,8 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     }
   }
 
-  private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
+  @VisibleForTesting
+  public String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
     String location = table.getOptions().getOrDefault(PATH.key(), "");
     if (StringUtils.isNullOrEmpty(location)) {
       try {
@@ -777,7 +783,47 @@ public class HoodieHiveCatalog extends AbstractCatalog {
   public void dropPartition(
       ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean 
ignoreIfNotExists)
       throws PartitionNotExistException, CatalogException {
-    throw new HoodieCatalogException("Not supported.");
+    checkNotNull(tablePath, "Table path cannot be null");
+    checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+
+    final CatalogBaseTable table;
+    try {
+      table = getTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec, e);
+      } else {
+        return;
+      }
+    }
+    try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePath, 
table)) {
+      boolean hiveStylePartitioning = 
Boolean.parseBoolean(table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key()));
+      writeClient.deletePartitions(
+          
Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning,
 partitionSpec)),
+              HoodieActiveTimeline.createNewInstantTime())
+          .forEach(writeStatus -> {
+            if (writeStatus.hasErrors()) {
+              throw new HoodieMetadataException(String.format("Failed to 
commit metadata table records at file id %s.", writeStatus.getFileId()));
+            }
+          });
+
+      client.dropPartition(
+          tablePath.getDatabaseName(),
+          tablePath.getObjectName(),
+          HoodieCatalogUtil.getOrderedPartitionValues(
+              getName(), getHiveConf(), partitionSpec, ((CatalogTable) 
table).getPartitionKeys(), tablePath),
+          true);
+    } catch (NoSuchObjectException e) {
+      if (!ignoreIfNotExists) {
+        throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec, e);
+      }
+    } catch (MetaException | PartitionSpecInvalidException e) {
+      throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec, e);
+    } catch (Exception e) {
+      throw new CatalogException(
+          String.format(
+              "Failed to drop partition %s of table %s", partitionSpec, 
tablePath));
+    }
   }
 
   @Override
@@ -906,4 +952,23 @@ public class HoodieHiveCatalog extends AbstractCatalog {
       return newOptions;
     }
   }
+
+  private HoodieFlinkWriteClient<?> createWriteClient(
+      ObjectPath tablePath,
+      CatalogBaseTable table) throws Exception {
+    Map<String, String> options = table.getOptions();
+    // enable auto-commit though ~
+    options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+    return StreamerUtil.createWriteClient(
+        Configuration.fromMap(options)
+            .set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
+            .set(FlinkOptions.SOURCE_AVRO_SCHEMA,
+                
HoodieTableMetaClient.builder().setBasePath(inferTablePath(tablePath, 
table)).setConf(hiveConf).build()
+                    
.getTableConfig().getTableCreateSchema().get().toString()));
+  }
+
+  @VisibleForTesting
+  public IMetaStoreClient getClient() {
+    return client;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 86be092ce8..29c0e2c061 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -345,6 +345,23 @@ public class StreamerUtil {
     }
   }
 
+  /**
+   * Returns whether the hoodie partition exists under given table path {@code 
tablePath} and partition path {@code partitionPath}.
+   *
+   * @param tablePath     Base path of the table.
+   * @param partitionPath The path of the partition.
+   * @param hadoopConf    The hadoop configuration.
+   */
+  public static boolean partitionExists(String tablePath, String 
partitionPath, org.apache.hadoop.conf.Configuration hadoopConf) {
+    // Hadoop FileSystem
+    FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
+    try {
+      return fs.exists(new Path(tablePath, partitionPath));
+    } catch (IOException e) {
+      throw new HoodieException(String.format("Error while checking whether 
partition exists under table path [%s] and partition path [%s]", tablePath, 
partitionPath), e);
+    }
+  }
+
   /**
    * Generates the bucket ID using format {partition path}_{fileID}.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 8e23ef9d63..e7736321b2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -18,9 +18,19 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
@@ -30,6 +40,7 @@ import 
org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -39,6 +50,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -57,8 +69,12 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
 import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -119,6 +135,7 @@ public class TestHoodieCatalog {
   );
 
   private TableEnvironment streamTableEnv;
+  private String catalogPathStr;
   private HoodieCatalog catalog;
 
   @TempDir
@@ -133,7 +150,8 @@ public class TestHoodieCatalog {
     File testDb = new File(tempFile, TEST_DEFAULT_DATABASE);
     testDb.mkdir();
     Map<String, String> catalogOptions = new HashMap<>();
-    catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
+    catalogPathStr = tempFile.getAbsolutePath();
+    catalogOptions.put(CATALOG_PATH.key(), catalogPathStr);
     catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE);
     catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions));
     catalog.open();
@@ -256,7 +274,7 @@ public class TestHoodieCatalog {
   }
 
   @Test
-  public void dropTable() throws Exception {
+  public void testDropTable() throws Exception {
     ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
     // create table
     catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
@@ -269,4 +287,37 @@ public class TestHoodieCatalog {
     assertThrows(TableNotExistException.class,
         () -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE, 
"non_exist"), false));
   }
+
+  @Test
+  public void testDropPartition() throws Exception {
+    ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+    // create table
+    catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
+
+    CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(new 
HashMap<String, String>() {
+      {
+        put("partition", "par1");
+      }
+    });
+    // drop non-exist partition
+    assertThrows(PartitionNotExistException.class,
+        () -> catalog.dropPartition(tablePath, partitionSpec, false));
+
+    String tablePathStr = catalog.inferTablePath(catalogPathStr, tablePath);
+    Configuration flinkConf = TestConfigurations.getDefaultConf(tablePathStr);
+    HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tablePathStr, 
HadoopConfigurations.getHadoopConf(flinkConf));
+    TestData.writeData(TestData.DATA_SET_INSERT, flinkConf);
+    assertTrue(catalog.partitionExists(tablePath, partitionSpec));
+
+    // drop partition 'par1'
+    catalog.dropPartition(tablePath, partitionSpec, false);
+
+    HoodieInstant latestInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().orElse(null);
+    assertNotNull(latestInstant, "Delete partition commit should be 
completed");
+    HoodieCommitMetadata commitMetadata = 
WriteProfiles.getCommitMetadata("tb1", new Path(tablePathStr), latestInstant, 
metaClient.getActiveTimeline());
+    assertThat(commitMetadata, instanceOf(HoodieReplaceCommitMetadata.class));
+    HoodieReplaceCommitMetadata replaceCommitMetadata = 
(HoodieReplaceCommitMetadata) commitMetadata;
+    assertThat(replaceCommitMetadata.getPartitionToReplaceFileIds().size(), 
is(1));
+    assertFalse(catalog.partitionExists(tablePath, partitionSpec));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index ffae71d6b2..5d27cdadbb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -19,24 +19,34 @@
 package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -53,8 +63,13 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -210,4 +225,51 @@ public class TestHoodieHiveCatalog {
 
     hoodieCatalog.renameTable(new ObjectPath("default", "test1"), "test", 
false);
   }
+
+  @Test
+  public void testDropPartition() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FactoryUtil.CONNECTOR.key(), "hudi");
+    CatalogTable table =
+        new CatalogTableImpl(schema, partitions, options, "hudi table");
+    hoodieCatalog.createTable(tablePath, table, false);
+
+    CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(new 
HashMap<String, String>() {
+      {
+        put("par1", "20221020");
+      }
+    });
+    // drop non-exist partition
+    assertThrows(PartitionNotExistException.class,
+        () -> hoodieCatalog.dropPartition(tablePath, partitionSpec, false));
+
+    Table hiveTable = hoodieCatalog.getHiveTable(tablePath);
+    StorageDescriptor partitionSd = new StorageDescriptor(hiveTable.getSd());
+    partitionSd.setLocation(new Path(partitionSd.getLocation(), 
HoodieCatalogUtil.inferPartitionPath(true, partitionSpec)).toString());
+    hoodieCatalog.getClient().add_partition(new 
Partition(Collections.singletonList("20221020"),
+        tablePath.getDatabaseName(), tablePath.getObjectName(), 0, 0, 
partitionSd, null));
+    assertNotNull(getHivePartition(partitionSpec));
+
+    // drop partition 'par1'
+    hoodieCatalog.dropPartition(tablePath, partitionSpec, false);
+
+    String tablePathStr = hoodieCatalog.inferTablePath(tablePath, 
hoodieCatalog.getTable(tablePath));
+    HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tablePathStr, hoodieCatalog.getHiveConf());
+    HoodieInstant latestInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().orElse(null);
+    assertNotNull(latestInstant, "Delete partition commit should be 
completed");
+    HoodieCommitMetadata commitMetadata = 
WriteProfiles.getCommitMetadata(tablePath.getObjectName(), new 
org.apache.flink.core.fs.Path(tablePathStr),
+        latestInstant, metaClient.getActiveTimeline());
+    assertThat(commitMetadata, instanceOf(HoodieReplaceCommitMetadata.class));
+    HoodieReplaceCommitMetadata replaceCommitMetadata = 
(HoodieReplaceCommitMetadata) commitMetadata;
+    assertThat(replaceCommitMetadata.getPartitionToReplaceFileIds().size(), 
is(1));
+    assertThrows(NoSuchObjectException.class, () -> 
getHivePartition(partitionSpec));
+  }
+
+  private Partition getHivePartition(CatalogPartitionSpec partitionSpec) 
throws Exception {
+    return hoodieCatalog.getClient().getPartition(
+        tablePath.getDatabaseName(),
+        tablePath.getObjectName(),
+        HoodieCatalogUtil.getOrderedPartitionValues(
+            hoodieCatalog.getName(), hoodieCatalog.getHiveConf(), 
partitionSpec, partitions, tablePath));
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 7b47ffa75f..7c5e76b2bd 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -509,6 +510,10 @@ public class HoodieInputFormatUtils {
       HoodieInstant instant,
       HoodieTimeline timeline) throws IOException {
     byte[] data = timeline.getInstantDetails(instant).get();
-    return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+    if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      return HoodieReplaceCommitMetadata.fromBytes(data, 
HoodieReplaceCommitMetadata.class);
+    } else {
+      return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+    }
   }
 }


Reply via email to