leesf commented on a change in pull request #2808:
URL: https://github.com/apache/hudi/pull/2808#discussion_r611597613



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -431,32 +470,49 @@ public String getLastCompletedInstant(String tableType) {
         .orElse(null);
   }
 
-  public void deletePendingInstant(String tableType, String instant) {
-    HoodieFlinkTable<T> table = getHoodieTable();
-    String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    HoodieActiveTimeline activeTimeline = 
table.getMetaClient().getActiveTimeline();
-    activeTimeline.deletePendingIfExists(HoodieInstant.State.INFLIGHT, 
commitType, instant);
-    activeTimeline.deletePendingIfExists(HoodieInstant.State.REQUESTED, 
commitType, instant);
-  }
-
-  public void transitionRequestedToInflight(String tableType, String 
inFlightInstant) {
+  public void transitionRequestedToInflight(String commitType, String 
inFlightInstant) {
     HoodieFlinkTable<T> table = getHoodieTable();
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
     HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, 
commitType, inFlightInstant);
     activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
         config.shouldAllowMultiWriteOnSameInstant());
   }
 
-  public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
+  public HoodieFlinkTable<T> getHoodieTable() {
+    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+  }
+
+  public Map<String, List<String>> getPartitionToReplacedFileIds(
+      WriteOperationType writeOperationType,
+      List<WriteStatus> writeStatuses) {
     HoodieFlinkTable<T> table = getHoodieTable();
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      rollbackInflightCompaction(inflightInstant, table);
+    switch (writeOperationType) {
+      case INSERT_OVERWRITE:
+        return writeStatuses.stream().map(status -> 
status.getStat().getPartitionPath()).distinct()
+            .collect(
+                Collectors.toMap(
+                    partition -> partition,
+                    partitionPath -> getAllExistingFileIds(table, 
partitionPath)));
+      case INSERT_OVERWRITE_TABLE:
+        Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+        List<String> partitionPaths =
+            FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), 
table.getMetaClient().getBasePath());
+        if (partitionPaths != null && partitionPaths.size() > 0) {
+          context.setJobStatus(this.getClass().getSimpleName(), "Getting 
ExistingFileIds of all partitions");
+          partitionToExistingFileIds = partitionPaths.stream().parallel()
+              .collect(
+                  Collectors.toMap(
+                      partition -> partition,
+                      partition -> getAllExistingFileIds(table, partition)));
+        }
+        return partitionToExistingFileIds;
+      default:
+        throw new AssertionError();

Review comment:
       use HoodieNotSupportedException or HoodieException?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to