SteNicholas commented on code in PR #6991:
URL: https://github.com/apache/hudi/pull/6991#discussion_r1128990648


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java:
##########
@@ -394,7 +408,40 @@ public void createPartition(ObjectPath tablePath, 
CatalogPartitionSpec catalogPa
   @Override
   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
catalogPartitionSpec, boolean ignoreIfNotExists)
       throws PartitionNotExistException, CatalogException {
-    throw new UnsupportedOperationException("dropPartition is not 
implemented.");
+    if (!tableExists(tablePath)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new PartitionNotExistException(getName(), tablePath, 
catalogPartitionSpec);
+      }
+    }
+
+    String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+    Map<String, String> options = 
TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(),
 "false"));
+    String partitionPathStr = 
HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, 
catalogPartitionSpec);
+
+    if (!StreamerUtil.partitionExists(tablePathStr, partitionPathStr, 
hadoopConf)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new PartitionNotExistException(getName(), tablePath, 
catalogPartitionSpec);
+      }
+    }
+
+    // enable auto-commit though ~
+    options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+    try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(options, 
tablePathStr, tablePath)) {
+      
writeClient.deletePartitions(Collections.singletonList(partitionPathStr), 
HoodieActiveTimeline.createNewInstantTime())
+          .forEach(writeStatus -> {
+            if (writeStatus.hasErrors()) {
+              throw new HoodieMetadataException(String.format("Failed to 
commit metadata table records at file id %s.", writeStatus.getFileId()));
+            }
+          });
+      fs.delete(new Path(tablePathStr, partitionPathStr), true);

Review Comment:
   @TengHuo, the invalid data files after dropping partition mean that the data 
files is dirty data. Meanwhile, why should the `dropPartition` behavior keeps 
the consistency between Flink and Spark? If should, IMO, this should provider 
unified interface to drop partition.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java:
##########
@@ -394,7 +408,40 @@ public void createPartition(ObjectPath tablePath, 
CatalogPartitionSpec catalogPa
   @Override
   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
catalogPartitionSpec, boolean ignoreIfNotExists)
       throws PartitionNotExistException, CatalogException {
-    throw new UnsupportedOperationException("dropPartition is not 
implemented.");
+    if (!tableExists(tablePath)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new PartitionNotExistException(getName(), tablePath, 
catalogPartitionSpec);
+      }
+    }
+
+    String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+    Map<String, String> options = 
TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
+    boolean hiveStylePartitioning = 
Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(),
 "false"));
+    String partitionPathStr = 
HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, 
catalogPartitionSpec);
+
+    if (!StreamerUtil.partitionExists(tablePathStr, partitionPathStr, 
hadoopConf)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new PartitionNotExistException(getName(), tablePath, 
catalogPartitionSpec);
+      }
+    }
+
+    // enable auto-commit though ~
+    options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+    try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(options, 
tablePathStr, tablePath)) {
+      
writeClient.deletePartitions(Collections.singletonList(partitionPathStr), 
HoodieActiveTimeline.createNewInstantTime())
+          .forEach(writeStatus -> {
+            if (writeStatus.hasErrors()) {
+              throw new HoodieMetadataException(String.format("Failed to 
commit metadata table records at file id %s.", writeStatus.getFileId()));
+            }
+          });
+      fs.delete(new Path(tablePathStr, partitionPathStr), true);

Review Comment:
   @TengHuo, the invalid data files after dropping partition mean that the data 
files is dirty data. Meanwhile, why should the `dropPartition` behavior keeps 
the consistency between Flink and Spark? If should, IMO, this should provider 
unified interface to drop partition.
   cc @voonhous 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to