[ 
https://issues.apache.org/jira/browse/HADOOP-18013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17603277#comment-17603277
 ] 

ASF GitHub Bot commented on HADOOP-18013:
-----------------------------------------

xinglin commented on code in PR #4729:
URL: https://github.com/apache/hadoop/pull/4729#discussion_r969020224


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CloudStoreTrashPolicy.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.Time;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.bindToDurationTrackerFactory;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_CREATE_CHECKPOINT;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_DELETE_CHECKPOINT;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_MOVE_TO_TRASH;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
+import static 
org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;
+
+/**
+ * A Cloud Store Trash Policy designed to be resilient to
+ * race conditions and configurable to automatically clean up
+ * the current user's older checkpoints whenever invoked.
+ *
+ *
+ * The duration of trash operations are tracked in
+ * the target FileSystem's statistics, if it is configured
+ * to track these statistics:
+ * <ul>
+ *   <li>{@link 
org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_CREATE_CHECKPOINT}</li>
+ *   <li>{@link 
org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_DELETE_CHECKPOINT}</li>
+ *   <li>{@link 
org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_MOVE_TO_TRASH}</li>
+ * </ul>
+ */
+public class CloudStoreTrashPolicy extends TrashPolicyDefault {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CloudStoreTrashPolicy.class);
+
+  /**
+   * Configuration option to clean up old trash: {@value}.
+   */
+  public static final String CLEANUP_OLD_CHECKPOINTS = 
"fs.trash.cleanup.old.checkpoints";
+
+  /**
+   * Default value of {@link #CLEANUP_OLD_CHECKPOINTS}: {@value}.
+   * This still requires the cleanup interval to be &gt; 0.
+   */
+  public static final boolean CLEANUP_OLD_CHECKPOINTS_DEFAULT = true;
+
+  /**
+   * Should old trash be cleaned up?
+   */
+  private boolean cleanupOldTrash;
+
+  /**
+   * Duration tracker if the FS provides one through its statistics.
+   */
+  private DurationTrackerFactory durationTrackerFactory;
+
+  public CloudStoreTrashPolicy() {
+  }
+
+  public boolean cleanupOldTrash() {
+    return cleanupOldTrash;
+  }
+
+  public DurationTrackerFactory getDurationTrackerFactory() {
+    return durationTrackerFactory;
+  }
+
+  /**
+   * Set the duration tracker factory; useful for testing.
+   * @param durationTrackerFactory factory.
+   */
+  public void setDurationTrackerFactory(
+      final DurationTrackerFactory durationTrackerFactory) {
+    this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+  }
+
+  @Override
+  public void initialize(final Configuration conf, final FileSystem fs) {
+    super.initialize(conf, fs);
+    cleanupOldTrash = getDeletionInterval() > 0
+        && conf.getBoolean(CLEANUP_OLD_CHECKPOINTS,
+        CLEANUP_OLD_CHECKPOINTS_DEFAULT);
+    // get any duration tracker
+    setDurationTrackerFactory(bindToDurationTrackerFactory(fs));
+  }
+
+  @Override
+  public boolean moveToTrash(Path path) throws IOException {
+    if (!isEnabled()) {
+      return false;
+    }
+    boolean moved;
+
+    if (!path.isAbsolute()) {
+      // make path absolute
+      path = new Path(fs.getWorkingDirectory(), path);
+    }
+    if (!fs.exists(path)) {
+      // path doesn't actually exist.
+      LOG.info("'{} was deleted before it could be moved to trash", path);
+      moved = true;
+    } else {
+
+      try (DurationInfo info = new DurationInfo(LOG, true, "moveToTrash(%s)",
+          path)) {
+
+        // need for lambda expression.
+        Path p = path;
+        moved = invokeTrackingDuration(
+            durationTrackerFactory.trackDuration(TRASH_MOVE_TO_TRASH), () ->
+                super.moveToTrash(p));
+
+      } catch (IOException e) {
+        if (!fs.exists(path)) {
+          // race condition with the trash setup; something else moved it.
+          // note that checking for FNFE is not sufficient as this may occur in
+          // the rename, at which point the exception may get downgraded.
+          LOG.info("'{} was deleted before it could be moved to trash", path);
+          LOG.debug("IOE raised on moveToTrash({})", path, e);
+          // report success
+          moved = true;
+        } else {
+          // source path still exists, so throw the exception and skip cleanup
+          // don't bother trying to cleanup here as it will only complicate
+          // error reporting
+          throw e;
+        }
+      }
+    }
+
+    // add cleanup
+    if (cleanupOldTrash()) {
+      executeTrashCleanup();
+    }
+    return moved;
+  }
+
+  /**
+   * Execute the cleanup.
+   * @throws IOException failure
+   */
+  @VisibleForTesting
+  public void executeTrashCleanup() throws IOException {
+    FileSystem fs = getFileSystem();
+    AtomicLong count = new AtomicLong();
+    long now = Time.now();
+
+    // list the roots, iterate through
+    // expecting only one root for object stores.
+    foreach(
+        remoteIteratorFromIterable(fs.getTrashRoots(false)),
+        trashRoot -> {
+          try {
+            count.addAndGet(deleteCheckpoint(trashRoot.getPath(), false));
+            createCheckpoint(trashRoot.getPath(), new Date(now));

Review Comment:
   moveToTrash() will be called by thousands of clients. IIRC, a new snapshot 
will be created, as long as the CURRENT dir exists. super.moveToTrash() will 
create the CURRENT dir if it does not exist. So, I'd image every moveToTrash() 
would create a new checkpoint which is probably not ideal. 
   
   Each client will also try to delete the same set of snapshots. I'd image 
some of clients will fail due to FILE_NOT_FOUND exception, because a checkpoint 
dir is removed by other clients. 
   
   Cleaning is something we need to handle for trash and if we could make this 
approach work, I would think that would be great. 





> ABFS: add cloud trash policy with per-schema policy selection
> -------------------------------------------------------------
>
>                 Key: HADOOP-18013
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18013
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs/azure
>    Affects Versions: 3.3.2
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> Add a custom TrashPolicy for azure which
> * is patched in to the config automatically if "fs.trash.classname" is unset
> * checks the source for existing at all before trying anything
> * considers itself enabled even if the trash interval is 0
> * if interval is 0, just does a delete (issue: what about 90s timeouts)
> * downgrades exceptions/failures during rename. If anything is raised and the 
> source dir isn't there, all is good.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to