TengHuo commented on code in PR #6991:
URL: https://github.com/apache/hudi/pull/6991#discussion_r1126150688
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java:
##########
@@ -394,7 +408,40 @@ public void createPartition(ObjectPath tablePath,
CatalogPartitionSpec catalogPa
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec
catalogPartitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException("dropPartition is not
implemented.");
+ if (!tableExists(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new PartitionNotExistException(getName(), tablePath,
catalogPartitionSpec);
+ }
+ }
+
+ String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+ Map<String, String> options =
TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
+ boolean hiveStylePartitioning =
Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(),
"false"));
+ String partitionPathStr =
HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning,
catalogPartitionSpec);
+
+ if (!StreamerUtil.partitionExists(tablePathStr, partitionPathStr,
hadoopConf)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new PartitionNotExistException(getName(), tablePath,
catalogPartitionSpec);
+ }
+ }
+
+ // enable auto-commit though ~
+ options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+ try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(options,
tablePathStr, tablePath)) {
+
writeClient.deletePartitions(Collections.singletonList(partitionPathStr),
HoodieActiveTimeline.createNewInstantTime())
+ .forEach(writeStatus -> {
+ if (writeStatus.hasErrors()) {
+ throw new HoodieMetadataException(String.format("Failed to
commit metadata table records at file id %s.", writeStatus.getFileId()));
+ }
+ });
+ fs.delete(new Path(tablePathStr, partitionPathStr), true);
Review Comment:
Got it.
So, if I understand correctly, in spark side, there will be the invalid data
files in the dropped partition if there is insert operation and no cleaner to
clean the data files. Am I right?
And may I ask what the invalid data files issue is exactly? Do you have a
ticket about it?
Voon and me are checking the code about `drop partitions`, we may fix it if
there is any.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]