linyiqun commented on a change in pull request #1555: URL: https://github.com/apache/ozone/pull/1555#discussion_r526087678
########## File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashPolicyOzone.java ########## @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Date; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrashPolicyDefault; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** TrashPolicy for Ozone Specific Trash Operations.Through this implementation + * of TrashPolicy ozone-specific trash optimizations are/will be made such as + * having a multithreaded TrashEmptier. + */ +public class TrashPolicyOzone extends TrashPolicyDefault { + + private static final Logger LOG = + LoggerFactory.getLogger(TrashPolicyOzone.class); + + private static final Path CURRENT = new Path("Current"); + + private final static int TRASH_EMPTIER_CORE_POOL_SIZE = 5; + + private static final FsPermission PERMISSION = + new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); + + private static final DateFormat CHECKPOINT = new SimpleDateFormat( + "yyMMddHHmmss"); + /** Format of checkpoint directories used prior to Hadoop 0.23. */ + private static final DateFormat OLD_CHECKPOINT = + new SimpleDateFormat("yyMMddHHmm"); + private static final int MSECS_PER_MINUTE = 60*1000; + + private long emptierInterval; + + public TrashPolicyOzone(){ + } + + private TrashPolicyOzone(FileSystem fs, Configuration conf){ + initialize(conf, fs); + } + + @Override + public void initialize(Configuration conf, FileSystem fs, Path path) { + this.fs = fs; + this.deletionInterval = (long)(conf.getFloat( + FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT) + * MSECS_PER_MINUTE); + this.emptierInterval = (long)(conf.getFloat( + FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) + * MSECS_PER_MINUTE); + } + + @Override + public void initialize(Configuration conf, FileSystem fs) { + this.fs = fs; + this.deletionInterval = (long)(conf.getFloat( + FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT) + * MSECS_PER_MINUTE); + this.emptierInterval = (long)(conf.getFloat( + FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) + * MSECS_PER_MINUTE); + if (deletionInterval < 0) { + LOG.warn("Invalid value {} for deletion interval," + + " deletion interaval can not be negative." + + "Changing to default value 0", deletionInterval); + this.deletionInterval = 0; + } + } + + + public Runnable getEmptier() throws IOException { + return new TrashPolicyOzone.Emptier(getConf(), emptierInterval); + } + + + protected class Emptier implements Runnable { + + private Configuration conf; + // same as checkpoint interval + private long emptierInterval; + + + private ThreadPoolExecutor executor; + + Emptier(Configuration conf, long emptierInterval) throws IOException { + this.conf = conf; + this.emptierInterval = emptierInterval; + if (emptierInterval > deletionInterval || emptierInterval <= 0) { + LOG.info("The configured checkpoint interval is " + + (emptierInterval / MSECS_PER_MINUTE) + " minutes." + + " Using an interval of " + + (deletionInterval / MSECS_PER_MINUTE) + + " minutes that is used for deletion instead"); + this.emptierInterval = deletionInterval; + } + LOG.info("Ozone Manager trash configuration: Deletion interval = " + + (deletionInterval / MSECS_PER_MINUTE) + + " minutes, Emptier interval = " + + (this.emptierInterval / MSECS_PER_MINUTE) + " minutes."); + executor = new ThreadPoolExecutor(TRASH_EMPTIER_CORE_POOL_SIZE, + TRASH_EMPTIER_CORE_POOL_SIZE, 1, + TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Override + public void run() { + if (emptierInterval == 0) { + return; // trash disabled + } + long now, end; + while (true) { + now = Time.now(); + end = ceiling(now, emptierInterval); + try { + // sleep for interval + Thread.sleep(end - now); + } catch (InterruptedException e) { + break; // exit on interrupt + } + + try { + now = Time.now(); + if (now >= end) { + Collection<FileStatus> trashRoots; + trashRoots = fs.getTrashRoots(true); // list all trash dirs + LOG.info("TrashrootSize: " + trashRoots.size()); + for (FileStatus trashRoot : trashRoots) { // dump each trash + LOG.info("Trashroot:" + trashRoot.getPath().toString()); + if (!trashRoot.isDirectory()) { + continue; + } + Runnable task = ()->{ + try { + TrashPolicyOzone trash = new TrashPolicyOzone(fs, conf); + trash.deleteCheckpoint(trashRoot.getPath(), false); + trash.createCheckpoint(trashRoot.getPath(), + new Date(Time.now())); + } catch (IOException e) { + LOG.info("Unable to checkpoint"); + } + }; + executor.submit(task); + LOG.info("Current threads in pool: " + + executor.getPoolSize()); + LOG.info("Currently executing threads: " + + executor.getActiveCount()); + LOG.info("Total number of threads(ever scheduled): " + + executor.getTaskCount()); + } + } + } catch (Exception e) { + LOG.warn("RuntimeException during Trash.Emptier.run(): ", e); + } + } + try { + fs.close(); + } catch(IOException e) { + LOG.warn("Trash cannot close FileSystem: ", e); + } Review comment: Can we shutdown executor pool in the finally block? ```java try { while (true) { .... } } finally { shutdown executor pool. } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
