This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4f6f15c3c7 [HUDI-5049] Supports dropPartition for Flink catalog (#6991)
4f6f15c3c7 is described below
commit 4f6f15c3c761621eaaa1b3b52e0c2841626afe53
Author: Nicholas Jiang <[email protected]>
AuthorDate: Tue Oct 25 11:24:32 2022 +0800
[HUDI-5049] Supports dropPartition for Flink catalog (#6991)
* for both dfs and hms catalogs
---
.../java/org/apache/hudi/table/HoodieTable.java | 2 +-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 8 ++
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 5 +-
.../FlinkDeletePartitionCommitActionExecutor.java | 101 +++++++++++++++++++++
.../apache/hudi/table/catalog/HoodieCatalog.java | 66 +++++++++++++-
.../hudi/table/catalog/HoodieCatalogUtil.java | 59 ++++++++++++
.../hudi/table/catalog/HoodieHiveCatalog.java | 69 +++++++++++++-
.../java/org/apache/hudi/util/StreamerUtil.java | 17 ++++
.../hudi/table/catalog/TestHoodieCatalog.java | 55 ++++++++++-
.../hudi/table/catalog/TestHoodieHiveCatalog.java | 62 +++++++++++++
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 7 +-
11 files changed, 440 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 4a6f4ae1f4..d94d229ff5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -210,7 +210,7 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload, I, K, O> implem
* @param partitions {@link List} of partition to be deleted
* @return HoodieWriteMetadata
*/
- public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext
context, String instantTime, List<String> partitions);
+ public abstract HoodieWriteMetadata<O> deletePartitions(HoodieEngineContext
context, String instantTime, List<String> partitions);
/**
* Upserts the given prepared records into the Hoodie table, at the supplied
instantTime.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 191eb003b9..a00c361fde 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -254,6 +254,14 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
return postWrite(result, instantTime, table);
}
+ public List<WriteStatus> deletePartitions(List<String> partitions, String
instantTime) {
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ initTable(WriteOperationType.DELETE_PARTITION,
Option.ofNullable(instantTime));
+ preWrite(instantTime, WriteOperationType.DELETE_PARTITION,
table.getMetaClient());
+ HoodieWriteMetadata<List<WriteStatus>> result =
table.deletePartitions(context, instantTime, partitions);
+ return postWrite(result, instantTime, table);
+ }
+
@Override
public void preWrite(String instantTime, WriteOperationType
writeOperationType, HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 7d2be6cb93..38e3918bd4 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -57,6 +57,7 @@ import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
+import
org.apache.hudi.table.action.commit.FlinkDeletePartitionCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
@@ -243,8 +244,8 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload>
}
@Override
- public HoodieWriteMetadata deletePartitions(HoodieEngineContext context,
String instantTime, List<String> partitions) {
- throw new HoodieNotSupportedException("DeletePartitions is not supported
yet");
+ public HoodieWriteMetadata<List<WriteStatus>>
deletePartitions(HoodieEngineContext context, String instantTime, List<String>
partitions) {
+ return new FlinkDeletePartitionCommitActionExecutor<>(context, config,
this, instantTime, partitions).execute();
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
new file mode 100644
index 0000000000..a301ba228e
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+public class FlinkDeletePartitionCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends FlinkInsertOverwriteCommitActionExecutor<T> {
+
+ private final List<String> partitions;
+
+ public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<?, ?, ?, ?>
table,
+ String instantTime,
+ List<String> partitions) {
+ super(context, null, config, table, instantTime, null,
WriteOperationType.DELETE_PARTITION);
+ this.partitions = partitions;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ try {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ context.setJobStatus(this.getClass().getSimpleName(), "Gather all file
ids from all deleting partitions.");
+ Map<String, List<String>> partitionToReplaceFileIds =
+ context.parallelize(partitions).distinct().collectAsList()
+ .stream().collect(Collectors.toMap(partitionPath ->
partitionPath, this::getAllExistingFileIds));
+ HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
+ result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+ result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+ result.setWriteStatuses(Collections.emptyList());
+
+ // created requested
+ HoodieInstant dropPartitionsInstant = new HoodieInstant(REQUESTED,
REPLACE_COMMIT_ACTION, instantTime);
+ if (!table.getMetaClient().getFs().exists(new
Path(table.getMetaClient().getMetaPath(),
+ dropPartitionsInstant.getFileName()))) {
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
HoodieRequestedReplaceMetadata.newBuilder()
+ .setOperationType(WriteOperationType.DELETE_PARTITION.name())
+ .setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
+ .build();
+
table.getMetaClient().getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant,
+
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+ }
+
+ this.saveWorkloadProfileMetadataToInflight(new
WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())),
+ instantTime);
+ this.commitOnAutoCommit(result);
+ return result;
+ } catch (Exception e) {
+ throw new HoodieDeletePartitionException("Failed to drop partitions for
commit time " + instantTime, e);
+ }
+ }
+
+ private List<String> getAllExistingFileIds(String partitionPath) {
+ // because new commit is not complete. it is safe to mark all existing
file Ids as old files
+ return
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 956d61cc3c..13f628347f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -18,15 +18,20 @@
package org.apache.hudi.table.catalog;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -382,7 +387,16 @@ public class HoodieCatalog extends AbstractCatalog {
@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
catalogPartitionSpec) throws CatalogException {
- return false;
+ if (!tableExists(tablePath)) {
+ return false;
+ }
+ String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+ Map<String, String> options =
TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(),
"false"));
+ return StreamerUtil.partitionExists(
+ inferTablePath(catalogPathStr, tablePath),
+ HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning,
catalogPartitionSpec),
+ hadoopConf);
}
@Override
@@ -394,7 +408,40 @@ public class HoodieCatalog extends AbstractCatalog {
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec
catalogPartitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException("dropPartition is not
implemented.");
+ if (!tableExists(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new PartitionNotExistException(getName(), tablePath,
catalogPartitionSpec);
+ }
+ }
+
+ String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+ Map<String, String> options =
TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(),
"false"));
+ String partitionPathStr =
HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning,
catalogPartitionSpec);
+
+ if (!StreamerUtil.partitionExists(tablePathStr, partitionPathStr,
hadoopConf)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new PartitionNotExistException(getName(), tablePath,
catalogPartitionSpec);
+ }
+ }
+
+ // enable auto-commit though ~
+ options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+ try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(options,
tablePathStr, tablePath)) {
+
writeClient.deletePartitions(Collections.singletonList(partitionPathStr),
HoodieActiveTimeline.createNewInstantTime())
+ .forEach(writeStatus -> {
+ if (writeStatus.hasErrors()) {
+ throw new HoodieMetadataException(String.format("Failed to
commit metadata table records at file id %s.", writeStatus.getFileId()));
+ }
+ });
+ fs.delete(new Path(tablePathStr, partitionPathStr), true);
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Dropping partition %s of table
%s exception.", partitionPathStr, tablePath), e);
+ }
}
@Override
@@ -505,7 +552,20 @@ public class HoodieCatalog extends AbstractCatalog {
return newOptions;
}
- private String inferTablePath(String catalogPath, ObjectPath tablePath) {
+ private HoodieFlinkWriteClient<?> createWriteClient(
+ Map<String, String> options,
+ String tablePathStr,
+ ObjectPath tablePath) throws IOException {
+ return StreamerUtil.createWriteClient(
+ Configuration.fromMap(options)
+ .set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
+ .set(FlinkOptions.SOURCE_AVRO_SCHEMA,
+ StreamerUtil.createMetaClient(tablePathStr, hadoopConf)
+
.getTableConfig().getTableCreateSchema().get().toString()));
+ }
+
+ @VisibleForTesting
+ protected String inferTablePath(String catalogPath, ObjectPath tablePath) {
return String.format("%s/%s/%s", catalogPath, tablePath.getDatabaseName(),
tablePath.getObjectName());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index 3dc191afb4..3b2d697a98 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -21,8 +21,12 @@ package org.apache.hudi.table.catalog;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,9 +39,11 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
@@ -113,4 +119,57 @@ public class HoodieCatalogUtil {
}
return Collections.emptyList();
}
+
+ /**
+ * Returns the partition path with given {@link CatalogPartitionSpec}.
+ */
+ public static String inferPartitionPath(boolean hiveStylePartitioning,
CatalogPartitionSpec catalogPartitionSpec) {
+ return catalogPartitionSpec.getPartitionSpec().entrySet()
+ .stream().map(entry ->
+ hiveStylePartitioning
+ ? String.format("%s=%s", entry.getKey(), entry.getValue())
+ : entry.getValue())
+ .collect(Collectors.joining("/"));
+ }
+
+ /**
+ * Returns a list of ordered partition values by re-arranging them based on
the given list of
+ * partition keys. If the partition value is null, it'll be converted into
default partition
+ * name.
+ *
+ * @param partitionSpec The partition spec
+ * @param partitionKeys The partition keys
+ * @param tablePath The table path
+ * @return A list of partition values ordered by partition keys
+ * @throws PartitionSpecInvalidException thrown if partitionSpec and
partitionKeys have
+ * different sizes, or any key in partitionKeys doesn't exist in
partitionSpec.
+ */
+ @VisibleForTesting
+ public static List<String> getOrderedPartitionValues(
+ String catalogName,
+ HiveConf hiveConf,
+ CatalogPartitionSpec partitionSpec,
+ List<String> partitionKeys,
+ ObjectPath tablePath)
+ throws PartitionSpecInvalidException {
+ Map<String, String> spec = partitionSpec.getPartitionSpec();
+ if (spec.size() != partitionKeys.size()) {
+ throw new PartitionSpecInvalidException(catalogName, partitionKeys,
tablePath, partitionSpec);
+ }
+
+ List<String> values = new ArrayList<>(spec.size());
+ for (String key : partitionKeys) {
+ if (!spec.containsKey(key)) {
+ throw new PartitionSpecInvalidException(catalogName, partitionKeys,
tablePath, partitionSpec);
+ } else {
+ String value = spec.get(key);
+ if (value == null) {
+ value = hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+ }
+ values.add(value);
+ }
+ }
+
+ return values;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 01b73b8605..c0cd386793 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -18,14 +18,18 @@
package org.apache.hudi.table.catalog;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.table.format.FilePathUtils;
@@ -74,6 +78,7 @@ import
org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -488,7 +493,8 @@ public class HoodieHiveCatalog extends AbstractCatalog {
}
}
- private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
+ @VisibleForTesting
+ public String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
String location = table.getOptions().getOrDefault(PATH.key(), "");
if (StringUtils.isNullOrEmpty(location)) {
try {
@@ -777,7 +783,47 @@ public class HoodieHiveCatalog extends AbstractCatalog {
public void dropPartition(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean
ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
- throw new HoodieCatalogException("Not supported.");
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
+
+ final CatalogBaseTable table;
+ try {
+ table = getTable(tablePath);
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw new PartitionNotExistException(getName(), tablePath,
partitionSpec, e);
+ } else {
+ return;
+ }
+ }
+ try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePath,
table)) {
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key()));
+ writeClient.deletePartitions(
+
Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning,
partitionSpec)),
+ HoodieActiveTimeline.createNewInstantTime())
+ .forEach(writeStatus -> {
+ if (writeStatus.hasErrors()) {
+ throw new HoodieMetadataException(String.format("Failed to
commit metadata table records at file id %s.", writeStatus.getFileId()));
+ }
+ });
+
+ client.dropPartition(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ HoodieCatalogUtil.getOrderedPartitionValues(
+ getName(), getHiveConf(), partitionSpec, ((CatalogTable)
table).getPartitionKeys(), tablePath),
+ true);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new PartitionNotExistException(getName(), tablePath,
partitionSpec, e);
+ }
+ } catch (MetaException | PartitionSpecInvalidException e) {
+ throw new PartitionNotExistException(getName(), tablePath,
partitionSpec, e);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to drop partition %s of table %s", partitionSpec,
tablePath));
+ }
}
@Override
@@ -906,4 +952,23 @@ public class HoodieHiveCatalog extends AbstractCatalog {
return newOptions;
}
}
+
+ private HoodieFlinkWriteClient<?> createWriteClient(
+ ObjectPath tablePath,
+ CatalogBaseTable table) throws Exception {
+ Map<String, String> options = table.getOptions();
+ // enable auto-commit though ~
+ options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+ return StreamerUtil.createWriteClient(
+ Configuration.fromMap(options)
+ .set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
+ .set(FlinkOptions.SOURCE_AVRO_SCHEMA,
+
HoodieTableMetaClient.builder().setBasePath(inferTablePath(tablePath,
table)).setConf(hiveConf).build()
+
.getTableConfig().getTableCreateSchema().get().toString()));
+ }
+
+ @VisibleForTesting
+ public IMetaStoreClient getClient() {
+ return client;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 86be092ce8..29c0e2c061 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -345,6 +345,23 @@ public class StreamerUtil {
}
}
+ /**
+ * Returns whether the hoodie partition exists under given table path {@code
tablePath} and partition path {@code partitionPath}.
+ *
+ * @param tablePath Base path of the table.
+ * @param partitionPath The path of the partition.
+ * @param hadoopConf The hadoop configuration.
+ */
+ public static boolean partitionExists(String tablePath, String
partitionPath, org.apache.hadoop.conf.Configuration hadoopConf) {
+ // Hadoop FileSystem
+ FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
+ try {
+ return fs.exists(new Path(tablePath, partitionPath));
+ } catch (IOException e) {
+ throw new HoodieException(String.format("Error while checking whether
partition exists under table path [%s] and partition path [%s]", tablePath,
partitionPath), e);
+ }
+ }
+
/**
* Generates the bucket ID using format {partition path}_{fileID}.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 8e23ef9d63..e7736321b2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -18,9 +18,19 @@
package org.apache.hudi.table.catalog;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
@@ -30,6 +40,7 @@ import
org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
@@ -39,6 +50,7 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -57,8 +69,12 @@ import java.util.stream.Collectors;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -119,6 +135,7 @@ public class TestHoodieCatalog {
);
private TableEnvironment streamTableEnv;
+ private String catalogPathStr;
private HoodieCatalog catalog;
@TempDir
@@ -133,7 +150,8 @@ public class TestHoodieCatalog {
File testDb = new File(tempFile, TEST_DEFAULT_DATABASE);
testDb.mkdir();
Map<String, String> catalogOptions = new HashMap<>();
- catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
+ catalogPathStr = tempFile.getAbsolutePath();
+ catalogOptions.put(CATALOG_PATH.key(), catalogPathStr);
catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE);
catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions));
catalog.open();
@@ -256,7 +274,7 @@ public class TestHoodieCatalog {
}
@Test
- public void dropTable() throws Exception {
+ public void testDropTable() throws Exception {
ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
// create table
catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
@@ -269,4 +287,37 @@ public class TestHoodieCatalog {
assertThrows(TableNotExistException.class,
() -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE,
"non_exist"), false));
}
+
+ @Test
+ public void testDropPartition() throws Exception {
+ ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+ // create table
+ catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
+
+ CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(new
HashMap<String, String>() {
+ {
+ put("partition", "par1");
+ }
+ });
+ // drop non-exist partition
+ assertThrows(PartitionNotExistException.class,
+ () -> catalog.dropPartition(tablePath, partitionSpec, false));
+
+ String tablePathStr = catalog.inferTablePath(catalogPathStr, tablePath);
+ Configuration flinkConf = TestConfigurations.getDefaultConf(tablePathStr);
+ HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tablePathStr,
HadoopConfigurations.getHadoopConf(flinkConf));
+ TestData.writeData(TestData.DATA_SET_INSERT, flinkConf);
+ assertTrue(catalog.partitionExists(tablePath, partitionSpec));
+
+ // drop partition 'par1'
+ catalog.dropPartition(tablePath, partitionSpec, false);
+
+ HoodieInstant latestInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().orElse(null);
+ assertNotNull(latestInstant, "Delete partition commit should be
completed");
+ HoodieCommitMetadata commitMetadata =
WriteProfiles.getCommitMetadata("tb1", new Path(tablePathStr), latestInstant,
metaClient.getActiveTimeline());
+ assertThat(commitMetadata, instanceOf(HoodieReplaceCommitMetadata.class));
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
(HoodieReplaceCommitMetadata) commitMetadata;
+ assertThat(replaceCommitMetadata.getPartitionToReplaceFileIds().size(),
is(1));
+ assertFalse(catalog.partitionExists(tablePath, partitionSpec));
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index ffae71d6b2..5d27cdadbb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -19,24 +19,34 @@
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -53,8 +63,13 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -210,4 +225,51 @@ public class TestHoodieHiveCatalog {
hoodieCatalog.renameTable(new ObjectPath("default", "test1"), "test",
false);
}
+
+ @Test
+ public void testDropPartition() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FactoryUtil.CONNECTOR.key(), "hudi");
+ CatalogTable table =
+ new CatalogTableImpl(schema, partitions, options, "hudi table");
+ hoodieCatalog.createTable(tablePath, table, false);
+
+ CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(new
HashMap<String, String>() {
+ {
+ put("par1", "20221020");
+ }
+ });
+ // drop non-exist partition
+ assertThrows(PartitionNotExistException.class,
+ () -> hoodieCatalog.dropPartition(tablePath, partitionSpec, false));
+
+ Table hiveTable = hoodieCatalog.getHiveTable(tablePath);
+ StorageDescriptor partitionSd = new StorageDescriptor(hiveTable.getSd());
+ partitionSd.setLocation(new Path(partitionSd.getLocation(),
HoodieCatalogUtil.inferPartitionPath(true, partitionSpec)).toString());
+ hoodieCatalog.getClient().add_partition(new
Partition(Collections.singletonList("20221020"),
+ tablePath.getDatabaseName(), tablePath.getObjectName(), 0, 0,
partitionSd, null));
+ assertNotNull(getHivePartition(partitionSpec));
+
+ // drop partition 'par1'
+ hoodieCatalog.dropPartition(tablePath, partitionSpec, false);
+
+ String tablePathStr = hoodieCatalog.inferTablePath(tablePath,
hoodieCatalog.getTable(tablePath));
+ HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tablePathStr, hoodieCatalog.getHiveConf());
+ HoodieInstant latestInstant =
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().orElse(null);
+ assertNotNull(latestInstant, "Delete partition commit should be
completed");
+ HoodieCommitMetadata commitMetadata =
WriteProfiles.getCommitMetadata(tablePath.getObjectName(), new
org.apache.flink.core.fs.Path(tablePathStr),
+ latestInstant, metaClient.getActiveTimeline());
+ assertThat(commitMetadata, instanceOf(HoodieReplaceCommitMetadata.class));
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
(HoodieReplaceCommitMetadata) commitMetadata;
+ assertThat(replaceCommitMetadata.getPartitionToReplaceFileIds().size(),
is(1));
+ assertThrows(NoSuchObjectException.class, () ->
getHivePartition(partitionSpec));
+ }
+
+ private Partition getHivePartition(CatalogPartitionSpec partitionSpec)
throws Exception {
+ return hoodieCatalog.getClient().getPartition(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ HoodieCatalogUtil.getOrderedPartitionValues(
+ hoodieCatalog.getName(), hoodieCatalog.getHiveConf(),
partitionSpec, partitions, tablePath));
+ }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 7b47ffa75f..7c5e76b2bd 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -509,6 +510,10 @@ public class HoodieInputFormatUtils {
HoodieInstant instant,
HoodieTimeline timeline) throws IOException {
byte[] data = timeline.getInstantDetails(instant).get();
- return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+ if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ return HoodieReplaceCommitMetadata.fromBytes(data,
HoodieReplaceCommitMetadata.class);
+ } else {
+ return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+ }
}
}