leesf commented on a change in pull request #2382:
URL: https://github.com/apache/hudi/pull/2382#discussion_r573683860



##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Performs Rollback of Hoodie Tables.
+ */
+public class JavaListingBasedRollbackHelper implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(JavaListingBasedRollbackHelper.class);
+
+  private final HoodieTableMetaClient metaClient;
+  private final HoodieWriteConfig config;
+
+  public JavaListingBasedRollbackHelper(HoodieTableMetaClient metaClient, 
HoodieWriteConfig config) {
+    this.metaClient = metaClient;
+    this.config = config;
+  }
+
+  /**
+   * Performs all rollback actions that we have collected in parallel.
+   */
+  public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, 
HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> 
rollbackRequests) {
+    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = 
maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true);
+
+    Map<String, List<Pair<String, HoodieRollbackStat>>> collect = 
partitionPathRollbackStatsPairs.entrySet()
+        .stream()
+        .map(x -> Pair.of(x.getKey(), 
x.getValue())).collect(Collectors.groupingBy(Pair::getLeft));
+    return collect.values().stream()
+        .map(pairs -> 
pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Collect all file info that needs to be rollbacked.
+   */
+  public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext 
context, HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> 
rollbackRequests) {
+    Map<String, HoodieRollbackStat> partitionPathRollbackStatsPairs = 
maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false);
+    return new ArrayList<>(partitionPathRollbackStatsPairs.values());
+  }
+
+  /**
+   * May be delete interested files and collect stats or collect stats only.
+   *
+   * @param context           instance of {@link HoodieEngineContext} to use.
+   * @param instantToRollback {@link HoodieInstant} of interest for which 
deletion or collect stats is requested.
+   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to 
be operated on.
+   * @param doDelete          {@code true} if deletion has to be done. {@code 
false} if only stats are to be collected w/o performing any deletes.
+   * @return stats collected with or w/o actual deletions.
+   */
+  Map<String, HoodieRollbackStat> 
maybeDeleteAndCollectStats(HoodieEngineContext context,
+                                                             HoodieInstant 
instantToRollback,
+                                                             
List<ListingBasedRollbackRequest> rollbackRequests,
+                                                             boolean doDelete) 
{
+    return context.mapToPair(rollbackRequests, rollbackRequest -> {
+      switch (rollbackRequest.getType()) {
+        case DELETE_DATA_FILES_ONLY: {
+          final Map<FileStatus, Boolean> filesToDeletedStatus = 
deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
+              rollbackRequest.getPartitionPath(), doDelete);
+          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+              
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  .withDeletedFileResults(filesToDeletedStatus).build());
+        }
+        case DELETE_DATA_AND_LOG_FILES: {
+          final Map<FileStatus, Boolean> filesToDeletedStatus = 
deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), 
rollbackRequest.getPartitionPath(), doDelete);
+          return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
+              
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  .withDeletedFileResults(filesToDeletedStatus).build());
+        }
+        case APPEND_ROLLBACK_BLOCK: {
+          HoodieLogFormat.Writer writer = null;
+          try {
+            writer = HoodieLogFormat.newWriterBuilder()
+                
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), 
rollbackRequest.getPartitionPath()))
+                .withFileId(rollbackRequest.getFileId().get())
+                
.overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
+                .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+            // generate metadata
+            if (doDelete) {
+              Map<HoodieLogBlock.HeaderMetadataType, String> header = 
generateHeader(instantToRollback.getTimestamp());
+              // if update belongs to an existing log file
+              writer.appendBlock(new HoodieCommandBlock(header));
+            }
+          } catch (IOException | InterruptedException io) {
+            throw new HoodieRollbackException("Failed to rollback for instant 
" + instantToRollback, io);
+          } finally {
+            try {
+              if (writer != null) {
+                writer.close();
+              }
+            } catch (IOException io) {
+              throw new HoodieIOException("Error appending rollback block..", 
io);
+            }
+          }
+
+          // This step is intentionally done after writer is closed. 
Guarantees that
+          // getFileStatus would reflect correct stats and 
FileNotFoundException is not thrown in
+          // cloud-storage : HUDI-168
+          Map<FileStatus, Long> filesToNumBlocksRollback = 
Collections.singletonMap(
+              
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+              1L

Review comment:
       move above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to