[
https://issues.apache.org/jira/browse/HADOOP-18013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17603277#comment-17603277
]
ASF GitHub Bot commented on HADOOP-18013:
-----------------------------------------
xinglin commented on code in PR #4729:
URL: https://github.com/apache/hadoop/pull/4729#discussion_r969020224
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CloudStoreTrashPolicy.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.Time;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsSupport.bindToDurationTrackerFactory;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_CREATE_CHECKPOINT;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_DELETE_CHECKPOINT;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.TRASH_MOVE_TO_TRASH;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
+import static
org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;
+
+/**
+ * A Cloud Store Trash Policy designed to be resilient to
+ * race conditions and configurable to automatically clean up
+ * the current user's older checkpoints whenever invoked.
+ *
+ *
+ * The duration of trash operations are tracked in
+ * the target FileSystem's statistics, if it is configured
+ * to track these statistics:
+ * <ul>
+ * <li>{@link
org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_CREATE_CHECKPOINT}</li>
+ * <li>{@link
org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_DELETE_CHECKPOINT}</li>
+ * <li>{@link
org.apache.hadoop.fs.statistics.StoreStatisticNames#TRASH_MOVE_TO_TRASH}</li>
+ * </ul>
+ */
+public class CloudStoreTrashPolicy extends TrashPolicyDefault {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CloudStoreTrashPolicy.class);
+
+ /**
+ * Configuration option to clean up old trash: {@value}.
+ */
+ public static final String CLEANUP_OLD_CHECKPOINTS =
"fs.trash.cleanup.old.checkpoints";
+
+ /**
+ * Default value of {@link #CLEANUP_OLD_CHECKPOINTS}: {@value}.
+ * This still requires the cleanup interval to be > 0.
+ */
+ public static final boolean CLEANUP_OLD_CHECKPOINTS_DEFAULT = true;
+
+ /**
+ * Should old trash be cleaned up?
+ */
+ private boolean cleanupOldTrash;
+
+ /**
+ * Duration tracker if the FS provides one through its statistics.
+ */
+ private DurationTrackerFactory durationTrackerFactory;
+
+ public CloudStoreTrashPolicy() {
+ }
+
+ public boolean cleanupOldTrash() {
+ return cleanupOldTrash;
+ }
+
+ public DurationTrackerFactory getDurationTrackerFactory() {
+ return durationTrackerFactory;
+ }
+
+ /**
+ * Set the duration tracker factory; useful for testing.
+ * @param durationTrackerFactory factory.
+ */
+ public void setDurationTrackerFactory(
+ final DurationTrackerFactory durationTrackerFactory) {
+ this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+ }
+
+ @Override
+ public void initialize(final Configuration conf, final FileSystem fs) {
+ super.initialize(conf, fs);
+ cleanupOldTrash = getDeletionInterval() > 0
+ && conf.getBoolean(CLEANUP_OLD_CHECKPOINTS,
+ CLEANUP_OLD_CHECKPOINTS_DEFAULT);
+ // get any duration tracker
+ setDurationTrackerFactory(bindToDurationTrackerFactory(fs));
+ }
+
+ @Override
+ public boolean moveToTrash(Path path) throws IOException {
+ if (!isEnabled()) {
+ return false;
+ }
+ boolean moved;
+
+ if (!path.isAbsolute()) {
+ // make path absolute
+ path = new Path(fs.getWorkingDirectory(), path);
+ }
+ if (!fs.exists(path)) {
+ // path doesn't actually exist.
+ LOG.info("'{} was deleted before it could be moved to trash", path);
+ moved = true;
+ } else {
+
+ try (DurationInfo info = new DurationInfo(LOG, true, "moveToTrash(%s)",
+ path)) {
+
+ // need for lambda expression.
+ Path p = path;
+ moved = invokeTrackingDuration(
+ durationTrackerFactory.trackDuration(TRASH_MOVE_TO_TRASH), () ->
+ super.moveToTrash(p));
+
+ } catch (IOException e) {
+ if (!fs.exists(path)) {
+ // race condition with the trash setup; something else moved it.
+ // note that checking for FNFE is not sufficient as this may occur in
+ // the rename, at which point the exception may get downgraded.
+ LOG.info("'{} was deleted before it could be moved to trash", path);
+ LOG.debug("IOE raised on moveToTrash({})", path, e);
+ // report success
+ moved = true;
+ } else {
+ // source path still exists, so throw the exception and skip cleanup
+ // don't bother trying to cleanup here as it will only complicate
+ // error reporting
+ throw e;
+ }
+ }
+ }
+
+ // add cleanup
+ if (cleanupOldTrash()) {
+ executeTrashCleanup();
+ }
+ return moved;
+ }
+
+ /**
+ * Execute the cleanup.
+ * @throws IOException failure
+ */
+ @VisibleForTesting
+ public void executeTrashCleanup() throws IOException {
+ FileSystem fs = getFileSystem();
+ AtomicLong count = new AtomicLong();
+ long now = Time.now();
+
+ // list the roots, iterate through
+ // expecting only one root for object stores.
+ foreach(
+ remoteIteratorFromIterable(fs.getTrashRoots(false)),
+ trashRoot -> {
+ try {
+ count.addAndGet(deleteCheckpoint(trashRoot.getPath(), false));
+ createCheckpoint(trashRoot.getPath(), new Date(now));
Review Comment:
moveToTrash() will be called by thousands of clients. IIRC, a new snapshot
will be created, as long as the CURRENT dir exists. super.moveToTrash() will
create the CURRENT dir if it does not exist. So, I'd image every moveToTrash()
would create a new checkpoint which is probably not ideal.
Each client will also try to delete the same set of snapshots. I'd image
some of clients will fail due to FILE_NOT_FOUND exception, because a checkpoint
dir is removed by other clients.
Cleaning is something we need to handle for trash and if we could make this
approach work, I would think that would be great.
> ABFS: add cloud trash policy with per-schema policy selection
> -------------------------------------------------------------
>
> Key: HADOOP-18013
> URL: https://issues.apache.org/jira/browse/HADOOP-18013
> Project: Hadoop Common
> Issue Type: Improvement
> Components: fs/azure
> Affects Versions: 3.3.2
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> Add a custom TrashPolicy for azure which
> * is patched in to the config automatically if "fs.trash.classname" is unset
> * checks the source for existing at all before trying anything
> * considers itself enabled even if the trash interval is 0
> * if interval is 0, just does a delete (issue: what about 90s timeouts)
> * downgrades exceptions/failures during rename. If anything is raised and the
> source dir isn't there, all is good.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]