This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 0aca5c7 HDDS-4346.Ozone specific Trash Policy (#1535)
0aca5c7 is described below
commit 0aca5c7058252a60db91ac06a5ded931e1e08296
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Wed Nov 18 16:51:46 2020 +0530
HDDS-4346.Ozone specific Trash Policy (#1535)
---
.../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 8 +
.../org/apache/hadoop/ozone/om/OzoneManager.java | 4 +-
.../apache/hadoop/ozone/om/TrashPolicyOzone.java | 235 +++++++++++++++++++++
3 files changed, 246 insertions(+), 1 deletion(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index 46528ff..ab35bbb 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -42,6 +43,7 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.TrashPolicyOzone;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -148,6 +150,8 @@ public class TestRootedOzoneFileSystem {
conf.setInt(OZONE_FS_ITERATE_BATCH_SIZE, 5);
// fs.ofs.impl would be loaded from META-INF, no need to manually set it
fs = FileSystem.get(conf);
+ conf.setClass("fs.trash.classname", TrashPolicyOzone.class,
+ TrashPolicy.class);
trash = new Trash(conf);
ofs = (RootedOzoneFileSystem) fs;
adapter = (BasicRootedOzoneClientAdapterImpl) ofs.getAdapter();
@@ -1186,6 +1190,10 @@ public class TestRootedOzoneFileSystem {
try (FSDataOutputStream stream = fs.create(path)) {
stream.write(1);
}
+ Assert.assertTrue(trash.getConf().getClass(
+ "fs.trash.classname", TrashPolicy.class).
+ isAssignableFrom(TrashPolicyOzone.class));
+
// Call moveToTrash. We can't call protected fs.rename() directly
trash.moveToTrash(path);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index f8e9fb1..2d2c9f4 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -1286,7 +1287,8 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
return FileSystem.get(fsconf);
}
});
-
+ conf.setClass("fs.trash.classname", TrashPolicyOzone.class,
+ TrashPolicy.class);
this.emptier = new Thread(new Trash(fs, conf).
getEmptier(), "Trash Emptier");
this.emptier.setDaemon(true);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java
new file mode 100644
index 0000000..c0278bc
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.TrashPolicyDefault;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TrashPolicy for Ozone Specific Trash Operations.Through this implementation
+ * of TrashPolicy ozone-specific trash optimizations are/will be made such as
+ * having a multithreaded TrashEmptier.
+ */
+public class TrashPolicyOzone extends TrashPolicyDefault {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TrashPolicyOzone.class);
+
+ private static final Path CURRENT = new Path("Current");
+
+ private static final FsPermission PERMISSION =
+ new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
+ private static final DateFormat CHECKPOINT = new SimpleDateFormat(
+ "yyMMddHHmmss");
+ /** Format of checkpoint directories used prior to Hadoop 0.23. */
+ private static final DateFormat OLD_CHECKPOINT =
+ new SimpleDateFormat("yyMMddHHmm");
+ private static final int MSECS_PER_MINUTE = 60*1000;
+
+ private long emptierInterval;
+
+ public TrashPolicyOzone(){
+ }
+
+ private TrashPolicyOzone(FileSystem fs, Configuration conf){
+ super.initialize(conf, fs);
+ }
+
+ @Override
+ public Runnable getEmptier() throws IOException {
+ return new TrashPolicyOzone.Emptier(getConf(), emptierInterval);
+ }
+
+ protected class Emptier implements Runnable {
+
+ private Configuration conf;
+ // same as checkpoint interval
+ private long emptierInterval;
+
+ Emptier(Configuration conf, long emptierInterval) throws IOException {
+ this.conf = conf;
+ this.emptierInterval = emptierInterval;
+ if (emptierInterval > deletionInterval || emptierInterval <= 0) {
+ LOG.info("The configured checkpoint interval is " +
+ (emptierInterval / MSECS_PER_MINUTE) + " minutes." +
+ " Using an interval of " +
+ (deletionInterval / MSECS_PER_MINUTE) +
+ " minutes that is used for deletion instead");
+ this.emptierInterval = deletionInterval;
+ }
+ LOG.info("Ozone Manager trash configuration: Deletion interval = "
+ + (deletionInterval / MSECS_PER_MINUTE)
+ + " minutes, Emptier interval = "
+ + (this.emptierInterval / MSECS_PER_MINUTE) + " minutes.");
+ }
+
+ @Override
+ public void run() {
+ if (emptierInterval == 0) {
+ return; // trash disabled
+ }
+ long now, end;
+ while (true) {
+ now = Time.now();
+ end = ceiling(now, emptierInterval);
+ try { // sleep for interval
+ Thread.sleep(end - now);
+ } catch (InterruptedException e) {
+ break; // exit on interrupt
+ }
+
+ try {
+ now = Time.now();
+ if (now >= end) {
+ Collection<FileStatus> trashRoots;
+ trashRoots = fs.getTrashRoots(true); // list all trash dirs
+
+ for (FileStatus trashRoot : trashRoots) { // dump each trash
+ if (!trashRoot.isDirectory()) {
+ continue;
+ }
+ try {
+ TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf);
+ trash.deleteCheckpoint(trashRoot.getPath(), false);
+ trash.createCheckpoint(trashRoot.getPath(), new Date(now));
+ } catch (IOException e) {
+ LOG.warn("Trash caught: "+e+". Skipping " +
+ trashRoot.getPath() + ".");
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("RuntimeException during Trash.Emptier.run(): ", e);
+ }
+ }
+ try {
+ fs.close();
+ } catch(IOException e) {
+ LOG.warn("Trash cannot close FileSystem: ", e);
+ }
+ }
+
+ private long ceiling(long time, long interval) {
+ return floor(time, interval) + interval;
+ }
+ private long floor(long time, long interval) {
+ return (time / interval) * interval;
+ }
+
+ }
+
+ private void createCheckpoint(Path trashRoot, Date date) throws IOException {
+ if (!fs.exists(new Path(trashRoot, CURRENT))) {
+ return;
+ }
+ Path checkpointBase;
+ synchronized (CHECKPOINT) {
+ checkpointBase = new Path(trashRoot, CHECKPOINT.format(date));
+ }
+ Path checkpoint = checkpointBase;
+ Path current = new Path(trashRoot, CURRENT);
+
+ int attempt = 0;
+ while (true) {
+ try {
+ fs.rename(current, checkpoint);
+ LOG.info("Created trash checkpoint: " + checkpoint.toUri().getPath());
+ break;
+ } catch (FileAlreadyExistsException e) {
+ if (++attempt > 1000) {
+ throw new IOException("Failed to checkpoint trash: " + checkpoint);
+ }
+ checkpoint = checkpointBase.suffix("-" + attempt);
+ }
+ }
+ }
+
+ private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
+ throws IOException {
+ LOG.info("TrashPolicyOzone#deleteCheckpoint for trashRoot: " + trashRoot);
+
+ FileStatus[] dirs = null;
+ try {
+ dirs = fs.listStatus(trashRoot); // scan trash sub-directories
+ } catch (FileNotFoundException fnfe) {
+ return;
+ }
+
+ long now = Time.now();
+ for (int i = 0; i < dirs.length; i++) {
+ Path path = dirs[i].getPath();
+ String dir = path.toUri().getPath();
+ String name = path.getName();
+ if (name.equals(CURRENT.getName())) { // skip current
+ continue;
+ }
+
+ long time;
+ try {
+ time = getTimeFromCheckpoint(name);
+ } catch (ParseException e) {
+ LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
+ continue;
+ }
+
+ if (((now - deletionInterval) > time) || deleteImmediately) {
+ if (fs.delete(path, true)) {
+ LOG.info("Deleted trash checkpoint: "+dir);
+ } else {
+ LOG.warn("Couldn't delete checkpoint: " + dir + " Ignoring.");
+ }
+ }
+ }
+ }
+
+ private long getTimeFromCheckpoint(String name) throws ParseException {
+ long time;
+
+ try {
+ synchronized (CHECKPOINT) {
+ time = CHECKPOINT.parse(name).getTime();
+ }
+ } catch (ParseException pe) {
+ // Check for old-style checkpoint directories left over
+ // after an upgrade from Hadoop 1.x
+ synchronized (OLD_CHECKPOINT) {
+ time = OLD_CHECKPOINT.parse(name).getTime();
+ }
+ }
+
+ return time;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]