Repository: hadoop
Updated Branches:
  refs/heads/branch-2 6e37e40dd -> 2b0b332e2


HADOOP-12973. Make DU pluggable. (Elliott Clark via cmccabe)

(cherry picked from commit 35f07705552ef5636f8b8d2599a6af5ec1426203)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b0b332e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b0b332e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b0b332e

Branch: refs/heads/branch-2
Commit: 2b0b332e2faedcee34db7b0c640aa1aba54b2f14
Parents: 6e37e40
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Tue Apr 12 16:20:30 2016 -0700
Committer: Colin Patrick Mccabe <cmcc...@cloudera.com>
Committed: Tue Apr 12 16:29:15 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/fs/CachingGetSpaceUsed.java   | 168 +++++++++++++
 .../src/main/java/org/apache/hadoop/fs/DU.java  | 240 ++++---------------
 .../java/org/apache/hadoop/fs/GetSpaceUsed.java | 147 ++++++++++++
 .../apache/hadoop/fs/WindowsGetSpaceUsed.java   |  46 ++++
 .../test/java/org/apache/hadoop/fs/TestDU.java  |  46 ++--
 .../org/apache/hadoop/fs/TestGetSpaceUsed.java  | 133 ++++++++++
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  93 +++----
 7 files changed, 617 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b0b332e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
new file mode 100644
index 0000000..6ef75d2
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Interface for class that can tell estimate much space
+ * is used in a directory.
+ * <p>
+ * The implementor is fee to cache space used. As such there
+ * are methods to update the cached value with any known changes.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Evolving
+public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
+  static final Logger LOG = LoggerFactory.getLogger(CachingGetSpaceUsed.class);
+
+  protected final AtomicLong used = new AtomicLong();
+  private final AtomicBoolean running = new AtomicBoolean(true);
+  private final long refreshInterval;
+  private final String dirPath;
+  private Thread refreshUsed;
+
+  /**
+   * This is the constructor used by the builder.
+   * All overriding classes should implement this.
+   */
+  public CachingGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
+      throws IOException {
+    this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
+  }
+
+  /**
+   * Keeps track of disk usage.
+   *
+   * @param path        the path to check disk usage in
+   * @param interval    refresh the disk usage at this interval
+   * @param initialUsed use this value until next refresh
+   * @throws IOException if we fail to refresh the disk usage
+   */
+  CachingGetSpaceUsed(File path,
+                      long interval,
+                      long initialUsed) throws IOException {
+    dirPath = path.getCanonicalPath();
+    refreshInterval = interval;
+    used.set(initialUsed);
+  }
+
+  void init() {
+    if (used.get() < 0) {
+      used.set(0);
+      refresh();
+    }
+
+    if (refreshInterval > 0) {
+      refreshUsed = new Thread(new RefreshThread(this),
+          "refreshUsed-" + dirPath);
+      refreshUsed.setDaemon(true);
+      refreshUsed.start();
+    } else {
+      running.set(false);
+      refreshUsed = null;
+    }
+  }
+
+  protected abstract void refresh();
+
+  /**
+   * @return an estimate of space used in the directory path.
+   */
+  @Override public long getUsed() throws IOException {
+    return Math.max(used.get(), 0);
+  }
+
+  /**
+   * @return The directory path being monitored.
+   */
+  public String getDirPath() {
+    return dirPath;
+  }
+
+  /**
+   * Increment the cached value of used space.
+   */
+  public void incDfsUsed(long value) {
+    used.addAndGet(value);
+  }
+
+  /**
+   * Is the background thread running.
+   */
+  boolean running() {
+    return running.get();
+  }
+
+  /**
+   * How long in between runs of the background refresh.
+   */
+  long getRefreshInterval() {
+    return refreshInterval;
+  }
+
+  /**
+   * Reset the current used data amount. This should be called
+   * when the cached value is re-computed.
+   *
+   * @param usedValue new value that should be the disk usage.
+   */
+  protected void setUsed(long usedValue) {
+    this.used.set(usedValue);
+  }
+
+  @Override
+  public void close() throws IOException {
+    running.set(false);
+    if (refreshUsed != null) {
+      refreshUsed.interrupt();
+    }
+  }
+
+  private static final class RefreshThread implements Runnable {
+
+    final CachingGetSpaceUsed spaceUsed;
+
+    RefreshThread(CachingGetSpaceUsed spaceUsed) {
+      this.spaceUsed = spaceUsed;
+    }
+
+    @Override
+    public void run() {
+      while (spaceUsed.running()) {
+        try {
+          Thread.sleep(spaceUsed.getRefreshInterval());
+          // update the used variable
+          spaceUsed.refresh();
+        } catch (InterruptedException e) {
+          LOG.warn("Thread Interrupted waiting to refresh disk information", 
e);
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b0b332e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java
index 5a4f526..f700e4f 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java
@@ -17,227 +17,73 @@
  */
 package org.apache.hadoop.fs;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.util.Shell;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
 
-/** Filesystem disk space usage statistics.  Uses the unix 'du' program*/
+/** Filesystem disk space usage statistics.  Uses the unix 'du' program */
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
 @InterfaceStability.Evolving
-public class DU extends Shell {
-  private String  dirPath;
+public class DU extends CachingGetSpaceUsed {
+  private DUShell duShell;
 
-  private AtomicLong used = new AtomicLong();
-  private volatile boolean shouldRun = true;
-  private Thread refreshUsed;
-  private IOException duException = null;
-  private long refreshInterval;
-  
-  /**
-   * Keeps track of disk usage.
-   * @param path the path to check disk usage in
-   * @param interval refresh the disk usage at this interval
-   * @throws IOException if we fail to refresh the disk usage
-   */
-  public DU(File path, long interval) throws IOException {
-    this(path, interval, -1L);
+  @VisibleForTesting
+   public DU(File path, long interval, long initialUsed) throws IOException {
+    super(path, interval, initialUsed);
   }
-  
-  /**
-   * Keeps track of disk usage.
-   * @param path the path to check disk usage in
-   * @param interval refresh the disk usage at this interval
-   * @param initialUsed use this value until next refresh
-   * @throws IOException if we fail to refresh the disk usage
-   */
-  public DU(File path, long interval, long initialUsed) throws IOException { 
-    super(0);
 
-    //we set the Shell interval to 0 so it will always run our command
-    //and use this one to set the thread sleep interval
-    this.refreshInterval = interval;
-    this.dirPath = path.getCanonicalPath();
-
-    //populate the used variable if the initial value is not specified.
-    if (initialUsed < 0) {
-      run();
-    } else {
-      this.used.set(initialUsed);
-    }
-  }
-
-  /**
-   * Keeps track of disk usage.
-   * @param path the path to check disk usage in
-   * @param conf configuration object
-   * @throws IOException if we fail to refresh the disk usage
-   */
-  public DU(File path, Configuration conf) throws IOException {
-    this(path, conf, -1L);
+  public DU(CachingGetSpaceUsed.Builder builder) throws IOException {
+    this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
   }
 
-  /**
-   * Keeps track of disk usage.
-   * @param path the path to check disk usage in
-   * @param conf configuration object
-   * @param initialUsed use it until the next refresh.
-   * @throws IOException if we fail to refresh the disk usage
-   */
-  public DU(File path, Configuration conf, long initialUsed)
-      throws IOException {
-    this(path, conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY,
-                CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT), initialUsed);
+  @Override
+  protected synchronized void refresh() {
+    if (duShell == null) {
+      duShell = new DUShell();
+    }
+    try {
+      duShell.startRefresh();
+    } catch (IOException ioe) {
+      LOG.warn("Could not get disk usage information", ioe);
+    }
   }
-    
-  
 
-  /**
-   * This thread refreshes the "used" variable.
-   * 
-   * Future improvements could be to not permanently
-   * run this thread, instead run when getUsed is called.
-   **/
-  class DURefreshThread implements Runnable {
-    
+  private final class DUShell extends Shell  {
+    void startRefresh() throws IOException {
+      run();
+    }
     @Override
-    public void run() {
-      
-      while(shouldRun) {
+    public String toString() {
+      return
+          "du -sk " + getDirPath() + "\n" + used.get() + "\t" + getDirPath();
+    }
 
-        try {
-          Thread.sleep(refreshInterval);
-          
-          try {
-            //update the used variable
-            DU.this.run();
-          } catch (IOException e) {
-            synchronized (DU.this) {
-              //save the latest exception so we can return it in getUsed()
-              duException = e;
-            }
-            
-            LOG.warn("Could not get disk usage information", e);
-          }
-        } catch (InterruptedException e) {
-        }
-      }
+    @Override
+    protected String[] getExecString() {
+      return new String[]{"du", "-sk", getDirPath()};
     }
-  }
-  
-  /**
-   * Decrease how much disk space we use.
-   * @param value decrease by this value
-   */
-  public void decDfsUsed(long value) {
-    used.addAndGet(-value);
-  }
 
-  /**
-   * Increase how much disk space we use.
-   * @param value increase by this value
-   */
-  public void incDfsUsed(long value) {
-    used.addAndGet(value);
-  }
-  
-  /**
-   * @return disk space used 
-   * @throws IOException if the shell command fails
-   */
-  public long getUsed() throws IOException {
-    //if the updating thread isn't started, update on demand
-    if(refreshUsed == null) {
-      run();
-    } else {
-      synchronized (DU.this) {
-        //if an exception was thrown in the last run, rethrow
-        if(duException != null) {
-          IOException tmp = duException;
-          duException = null;
-          throw tmp;
-        }
+    @Override
+    protected void parseExecResult(BufferedReader lines) throws IOException {
+      String line = lines.readLine();
+      if (line == null) {
+        throw new IOException("Expecting a line not the end of stream");
+      }
+      String[] tokens = line.split("\t");
+      if (tokens.length == 0) {
+        throw new IOException("Illegal du output");
       }
+      setUsed(Long.parseLong(tokens[0]) * 1024);
     }
-    
-    return Math.max(used.longValue(), 0L);
-  }
-
-  /**
-   * @return the path of which we're keeping track of disk usage
-   */
-  public String getDirPath() {
-    return dirPath;
-  }
 
-
-  /**
-   * Override to hook in DUHelper class. Maybe this can be used more
-   * generally as well on Unix/Linux based systems
-   */
-  @Override
-  protected void run() throws IOException {
-    if (WINDOWS) {
-      used.set(DUHelper.getFolderUsage(dirPath));
-      return;
-    }
-    super.run();
-  }
-  
-  /**
-   * Start the disk usage checking thread.
-   */
-  public void start() {
-    //only start the thread if the interval is sane
-    if(refreshInterval > 0) {
-      refreshUsed = new Thread(new DURefreshThread(), 
-          "refreshUsed-"+dirPath);
-      refreshUsed.setDaemon(true);
-      refreshUsed.start();
-    }
-  }
-  
-  /**
-   * Shut down the refreshing thread.
-   */
-  public void shutdown() {
-    this.shouldRun = false;
-    
-    if(this.refreshUsed != null) {
-      this.refreshUsed.interrupt();
-    }
-  }
-  
-  @Override
-  public String toString() {
-    return
-      "du -sk " + dirPath +"\n" +
-      used + "\t" + dirPath;
   }
 
-  @Override
-  protected String[] getExecString() {
-    return new String[] {"du", "-sk", dirPath};
-  }
-  
-  @Override
-  protected void parseExecResult(BufferedReader lines) throws IOException {
-    String line = lines.readLine();
-    if (line == null) {
-      throw new IOException("Expecting a line not the end of stream");
-    }
-    String[] tokens = line.split("\t");
-    if(tokens.length == 0) {
-      throw new IOException("Illegal du output");
-    }
-    this.used.set(Long.parseLong(tokens[0])*1024);
-  }
 
   public static void main(String[] args) throws Exception {
     String path = ".";
@@ -245,6 +91,10 @@ public class DU extends Shell {
       path = args[0];
     }
 
-    System.out.println(new DU(new File(path), new Configuration()).toString());
+    GetSpaceUsed du = new GetSpaceUsed.Builder().setPath(new File(path))
+                                                .setConf(new Configuration())
+                                                .build();
+    String duResult = du.toString();
+    System.out.println(duResult);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b0b332e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GetSpaceUsed.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GetSpaceUsed.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GetSpaceUsed.java
new file mode 100644
index 0000000..aebc3f7
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GetSpaceUsed.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+public interface GetSpaceUsed {
+  long getUsed() throws IOException;
+
+  /**
+   * The builder class
+   */
+  final class Builder {
+    static final Logger LOG = LoggerFactory.getLogger(Builder.class);
+
+    static final String CLASSNAME_KEY = "fs.getspaceused.classname";
+
+    private Configuration conf;
+    private Class<? extends GetSpaceUsed> klass = null;
+    private File path = null;
+    private Long interval = null;
+    private Long initialUsed = null;
+
+    public Configuration getConf() {
+      return conf;
+    }
+
+    public Builder setConf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public long getInterval() {
+      if (interval != null) {
+        return interval;
+      }
+      long result = CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT;
+      if (conf == null) {
+        return result;
+      }
+      return conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, result);
+    }
+
+    public Builder setInterval(long interval) {
+      this.interval = interval;
+      return this;
+    }
+
+    public Class<? extends GetSpaceUsed> getKlass() {
+      if (klass != null) {
+        return klass;
+      }
+      Class<? extends GetSpaceUsed> result = null;
+      if (Shell.WINDOWS) {
+        result = WindowsGetSpaceUsed.class;
+      } else {
+        result = DU.class;
+      }
+      if (conf == null) {
+        return result;
+      }
+      return conf.getClass(CLASSNAME_KEY, result, GetSpaceUsed.class);
+    }
+
+    public Builder setKlass(Class<? extends GetSpaceUsed> klass) {
+      this.klass = klass;
+      return this;
+    }
+
+    public File getPath() {
+      return path;
+    }
+
+    public Builder setPath(File path) {
+      this.path = path;
+      return this;
+    }
+
+    public long getInitialUsed() {
+      if (initialUsed == null) {
+        return -1;
+      }
+      return initialUsed;
+    }
+
+    public Builder setInitialUsed(long initialUsed) {
+      this.initialUsed = initialUsed;
+      return this;
+    }
+
+    public GetSpaceUsed build() throws IOException {
+      GetSpaceUsed getSpaceUsed = null;
+      try {
+        Constructor<? extends GetSpaceUsed> cons =
+            getKlass().getConstructor(Builder.class);
+        getSpaceUsed = cons.newInstance(this);
+      } catch (InstantiationException e) {
+        LOG.warn("Error trying to create an instance of " + getKlass(), e);
+      } catch (IllegalAccessException e) {
+        LOG.warn("Error trying to create " + getKlass(), e);
+      } catch (InvocationTargetException e) {
+        LOG.warn("Error trying to create " + getKlass(), e);
+      } catch (NoSuchMethodException e) {
+        LOG.warn("Doesn't look like the class " + getKlass() +
+            " have the needed constructor", e);
+      }
+      // If there were any exceptions then du will be null.
+      // Construct our best guess fallback.
+      if (getSpaceUsed == null) {
+        if (Shell.WINDOWS) {
+          getSpaceUsed = new WindowsGetSpaceUsed(this);
+        } else {
+          getSpaceUsed = new DU(this);
+        }
+      }
+      // Call init after classes constructors have finished.
+      if (getSpaceUsed instanceof CachingGetSpaceUsed) {
+        ((CachingGetSpaceUsed) getSpaceUsed).init();
+      }
+      return getSpaceUsed;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b0b332e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WindowsGetSpaceUsed.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WindowsGetSpaceUsed.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WindowsGetSpaceUsed.java
new file mode 100644
index 0000000..deb1343
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WindowsGetSpaceUsed.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.IOException;
+
+/**
+ * Class to tell the size of a path on windows.
+ * Rather than shelling out, on windows this uses DUHelper.getFolderUsage
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Evolving
+public class WindowsGetSpaceUsed extends CachingGetSpaceUsed {
+
+
+  WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder) throws IOException {
+    super(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
+  }
+
+  /**
+   * Override to hook in DUHelper class.
+   */
+  @Override
+  protected void refresh() {
+    used.set(DUHelper.getFolderUsage(getDirPath()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b0b332e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java
index dded9fb..739263f 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.test.GenericTestUtils;
 
-/** This test makes sure that "DU" does not get to run on each call to getUsed 
*/ 
+/** This test makes sure that "DU" does not get to run on each call to getUsed 
*/
 public class TestDU extends TestCase {
   final static private File DU_DIR = GenericTestUtils.getTestDir("dutmp");
 
@@ -42,7 +42,7 @@ public class TestDU extends TestCase {
   public void tearDown() throws IOException {
       FileUtil.fullyDelete(DU_DIR);
   }
-    
+
   private void createFile(File newFile, int size) throws IOException {
     // write random data so that filesystems with compression enabled (e.g., 
ZFS)
     // can't compress the file
@@ -54,18 +54,18 @@ public class TestDU extends TestCase {
     RandomAccessFile file = new RandomAccessFile(newFile, "rws");
 
     file.write(data);
-      
+
     file.getFD().sync();
     file.close();
   }
 
   /**
    * Verify that du returns expected used space for a file.
-   * We assume here that if a file system crates a file of size 
+   * We assume here that if a file system crates a file of size
    * that is a multiple of the block size in this file system,
    * then the used size for the file will be exactly that size.
    * This is true for most file systems.
-   * 
+   *
    * @throws IOException
    * @throws InterruptedException
    */
@@ -78,28 +78,29 @@ public class TestDU extends TestCase {
     createFile(file, writtenSize);
 
     Thread.sleep(5000); // let the metadata updater catch up
-    
-    DU du = new DU(file, 10000);
-    du.start();
+
+    DU du = new DU(file, 10000, -1);
+    du.init();
     long duSize = du.getUsed();
-    du.shutdown();
+    du.close();
 
     assertTrue("Invalid on-disk size",
         duSize >= writtenSize &&
         writtenSize <= (duSize + slack));
-    
-    //test with 0 interval, will not launch thread 
-    du = new DU(file, 0);
-    du.start();
+
+    //test with 0 interval, will not launch thread
+    du = new DU(file, 0, -1);
+    du.init();
     duSize = du.getUsed();
-    du.shutdown();
-    
+    du.close();
+
     assertTrue("Invalid on-disk size",
         duSize >= writtenSize &&
         writtenSize <= (duSize + slack));
-    
-    //test without launching thread 
-    du = new DU(file, 10000);
+
+    //test without launching thread
+    du = new DU(file, 10000, -1);
+    du.init();
     duSize = du.getUsed();
 
     assertTrue("Invalid on-disk size",
@@ -111,8 +112,8 @@ public class TestDU extends TestCase {
     assertTrue(file.createNewFile());
     Configuration conf = new Configuration();
     conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 10000L);
-    DU du = new DU(file, conf);
-    du.decDfsUsed(Long.MAX_VALUE);
+    DU du = new DU(file, 10000L, -1);
+    du.incDfsUsed(-Long.MAX_VALUE);
     long duSize = du.getUsed();
     assertTrue(String.valueOf(duSize), duSize >= 0L);
   }
@@ -121,7 +122,7 @@ public class TestDU extends TestCase {
     File file = new File(DU_DIR, "dataX");
     createFile(file, 8192);
     DU du = new DU(file, 3000, 1024);
-    du.start();
+    du.init();
     assertTrue("Initial usage setting not honored", du.getUsed() == 1024);
 
     // wait until the first du runs.
@@ -131,4 +132,7 @@ public class TestDU extends TestCase {
 
     assertTrue("Usage didn't get updated", du.getUsed() == 8192);
   }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b0b332e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetSpaceUsed.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetSpaceUsed.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetSpaceUsed.java
new file mode 100644
index 0000000..f436713
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestGetSpaceUsed.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class TestGetSpaceUsed {
+  final static private File DIR = new File(
+      System.getProperty("test.build.data", "/tmp"), "TestGetSpaceUsed");
+
+  @Before
+  public void setUp() {
+    FileUtil.fullyDelete(DIR);
+    assertTrue(DIR.mkdirs());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtil.fullyDelete(DIR);
+  }
+
+  /**
+   * Test that the builder can create a class specified through the class.
+   */
+  @Test
+  public void testBuilderConf() throws Exception {
+    File file = new File(DIR, "testBuilderConf");
+    assertTrue(file.createNewFile());
+    Configuration conf = new Configuration();
+    conf.set("fs.getspaceused.classname", DummyDU.class.getName());
+    CachingGetSpaceUsed instance =
+        (CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
+            .setPath(file)
+            .setInterval(0)
+            .setConf(conf)
+            .build();
+    assertNotNull(instance);
+    assertTrue(instance instanceof DummyDU);
+    assertFalse(instance.running());
+    instance.close();
+  }
+
+  @Test
+  public void testBuildInitial() throws Exception {
+    File file = new File(DIR, "testBuildInitial");
+    assertTrue(file.createNewFile());
+    CachingGetSpaceUsed instance =
+        (CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
+            .setPath(file)
+            .setInitialUsed(90210)
+            .setKlass(DummyDU.class)
+            .build();
+    assertEquals(90210, instance.getUsed());
+    instance.close();
+  }
+
+  @Test
+  public void testBuildInterval() throws Exception {
+    File file = new File(DIR, "testBuildInitial");
+    assertTrue(file.createNewFile());
+    CachingGetSpaceUsed instance =
+        (CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
+            .setPath(file)
+            .setInitialUsed(90210)
+            .setInterval(50060)
+            .setKlass(DummyDU.class)
+            .build();
+    assertEquals(50060, instance.getRefreshInterval());
+    instance.close();
+  }
+
+  @Test
+  public void testBuildNonCaching() throws Exception {
+    File file = new File(DIR, "testBuildNonCaching");
+    assertTrue(file.createNewFile());
+    GetSpaceUsed instance =  new CachingGetSpaceUsed.Builder()
+            .setPath(file)
+            .setInitialUsed(90210)
+            .setInterval(50060)
+            .setKlass(DummyGetSpaceUsed.class)
+            .build();
+    assertEquals(300, instance.getUsed());
+    assertTrue(instance instanceof DummyGetSpaceUsed);
+  }
+
+  private static class DummyDU extends CachingGetSpaceUsed {
+
+    public DummyDU(Builder builder) throws IOException {
+      // Push to the base class.
+      // Most times that's all that will need to be done.
+      super(builder);
+    }
+
+    @Override
+    protected void refresh() {
+      // This is a test so don't du anything.
+    }
+  }
+
+  private static class DummyGetSpaceUsed implements GetSpaceUsed {
+
+    public DummyGetSpaceUsed(GetSpaceUsed.Builder builder) {
+
+    }
+
+    @Override public long getUsed() throws IOException {
+      return 300;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b0b332e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 14faa71..57804d1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -36,8 +36,9 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.DU;
+import org.apache.hadoop.fs.CachingGetSpaceUsed;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.GetSpaceUsed;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -62,10 +63,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Files;
 
 /**
- * A block pool slice represents a portion of a block pool stored on a volume. 
 
- * Taken together, all BlockPoolSlices sharing a block pool ID across a 
+ * A block pool slice represents a portion of a block pool stored on a volume.
+ * Taken together, all BlockPoolSlices sharing a block pool ID across a
  * cluster represent a single block pool.
- * 
+ *
  * This class is synchronized by {@link FsVolumeImpl}.
  */
 class BlockPoolSlice {
@@ -92,10 +93,10 @@ class BlockPoolSlice {
   private final Timer timer;
 
   // TODO:FEDERATION scalability issue - a thread per DU is needed
-  private final DU dfsUsage;
+  private final GetSpaceUsed dfsUsage;
 
   /**
-   * Create a blook pool slice 
+   * Create a blook pool slice
    * @param bpid Block pool Id
    * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
    * @param bpDir directory corresponding to the BlockPool
@@ -107,7 +108,7 @@ class BlockPoolSlice {
       Configuration conf, Timer timer) throws IOException {
     this.bpid = bpid;
     this.volume = volume;
-    this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
+    this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
     this.finalizedDir = new File(
         currentDir, DataStorage.STORAGE_DIR_FINALIZED);
     this.lazypersistDir = new File(currentDir, 
DataStorage.STORAGE_DIR_LAZY_PERSIST);
@@ -157,8 +158,10 @@ class BlockPoolSlice {
     }
     // Use cached value initially if available. Or the following call will
     // block until the initial du command completes.
-    this.dfsUsage = new DU(bpDir, conf, loadDfsUsed());
-    this.dfsUsage.start();
+    this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
+                                                     .setConf(conf)
+                                                     
.setInitialUsed(loadDfsUsed())
+                                                     .build();
 
     // Make the dfs usage to be saved during shutdown.
     ShutdownHookManager.get().addShutdownHook(
@@ -179,7 +182,7 @@ class BlockPoolSlice {
   File getFinalizedDir() {
     return finalizedDir;
   }
-  
+
   File getLazypersistDir() {
     return lazypersistDir;
   }
@@ -194,17 +197,21 @@ class BlockPoolSlice {
 
   /** Run DU on local drives.  It must be synchronized from caller. */
   void decDfsUsed(long value) {
-    dfsUsage.decDfsUsed(value);
+    if (dfsUsage instanceof CachingGetSpaceUsed) {
+      ((CachingGetSpaceUsed)dfsUsage).incDfsUsed(-value);
+    }
   }
-  
+
   long getDfsUsed() throws IOException {
     return dfsUsage.getUsed();
   }
 
   void incDfsUsed(long value) {
-    dfsUsage.incDfsUsed(value);
+    if (dfsUsage instanceof CachingGetSpaceUsed) {
+      ((CachingGetSpaceUsed)dfsUsage).incDfsUsed(value);
+    }
   }
-  
+
   /**
    * Read in the cached DU value and return it if it is less than
    * cachedDfsUsedCheckTime which is set by
@@ -310,7 +317,10 @@ class BlockPoolSlice {
     }
     File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
     File metaFile = FsDatasetUtil.getMetaFile(blockFile, 
b.getGenerationStamp());
-    dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
+    if (dfsUsage instanceof CachingGetSpaceUsed) {
+      ((CachingGetSpaceUsed) dfsUsage).incDfsUsed(
+          b.getNumBytes() + metaFile.length());
+    }
     return blockFile;
   }
 
@@ -337,7 +347,7 @@ class BlockPoolSlice {
   }
 
 
-    
+
   void getVolumeMap(ReplicaMap volumeMap,
                     final RamDiskReplicaTracker lazyWriteReplicaMap)
       throws IOException {
@@ -348,7 +358,7 @@ class BlockPoolSlice {
       FsDatasetImpl.LOG.info(
           "Recovered " + numRecovered + " replicas from " + lazypersistDir);
     }
-    
+
     boolean  success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
     if (!success) {
       // add finalized replicas
@@ -442,7 +452,7 @@ class BlockPoolSlice {
     FileUtil.fullyDelete(source);
     return numRecovered;
   }
-  
+
   private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
       final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized)
       throws IOException {
@@ -450,7 +460,7 @@ class BlockPoolSlice {
     long blockId = block.getBlockId();
     long genStamp = block.getGenerationStamp();
     if (isFinalized) {
-      newReplica = new FinalizedReplica(blockId, 
+      newReplica = new FinalizedReplica(blockId,
           block.getNumBytes(), genStamp, volume, DatanodeUtil
           .idToBlockDir(finalizedDir, blockId));
     } else {
@@ -467,7 +477,7 @@ class BlockPoolSlice {
           // We don't know the expected block length, so just use 0
           // and don't reserve any more space for writes.
           newReplica = new ReplicaBeingWritten(blockId,
-              validateIntegrityAndSetLength(file, genStamp), 
+              validateIntegrityAndSetLength(file, genStamp),
               genStamp, volume, file.getParentFile(), null, 0);
           loadRwr = false;
         }
@@ -513,7 +523,7 @@ class BlockPoolSlice {
       incrNumBlocks();
     }
   }
-  
+
 
   /**
    * Add replicas under the given directory to the volume map
@@ -543,12 +553,12 @@ class BlockPoolSlice {
       }
       if (!Block.isBlockFilename(file))
         continue;
-      
+
       long genStamp = FsDatasetUtil.getGenerationStampFromFile(
           files, file);
       long blockId = Block.filename2id(file.getName());
-      Block block = new Block(blockId, file.length(), genStamp); 
-      addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, 
+      Block block = new Block(blockId, file.length(), genStamp);
+      addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
           isFinalized);
     }
   }
@@ -642,11 +652,11 @@ class BlockPoolSlice {
 
   /**
    * Find out the number of bytes in the block that match its crc.
-   * 
-   * This algorithm assumes that data corruption caused by unexpected 
+   *
+   * This algorithm assumes that data corruption caused by unexpected
    * datanode shutdown occurs only in the last crc chunk. So it checks
    * only the last chunk.
-   * 
+   *
    * @param blockFile the block file
    * @param genStamp generation stamp of the block
    * @return the number of valid bytes
@@ -673,7 +683,7 @@ class BlockPoolSlice {
       int bytesPerChecksum = checksum.getBytesPerChecksum();
       int checksumSize = checksum.getChecksumSize();
       long numChunks = Math.min(
-          (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, 
+          (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
           (metaFileLen - crcHeaderLen)/checksumSize);
       if (numChunks == 0) {
         return 0;
@@ -716,17 +726,20 @@ class BlockPoolSlice {
       IOUtils.closeStream(blockIn);
     }
   }
-    
+
   @Override
   public String toString() {
     return currentDir.getAbsolutePath();
   }
-  
+
   void shutdown(BlockListAsLongs blocksListToPersist) {
     saveReplicas(blocksListToPersist);
     saveDfsUsed();
     dfsUsedSaved = true;
-    dfsUsage.shutdown();
+
+    if (dfsUsage instanceof CachingGetSpaceUsed) {
+      IOUtils.cleanup(LOG, ((CachingGetSpaceUsed) dfsUsage));
+    }
   }
 
   private boolean readReplicasFromCache(ReplicaMap volumeMap,
@@ -735,17 +748,17 @@ class BlockPoolSlice {
     File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
     // Check whether the file exists or not.
     if (!replicaFile.exists()) {
-      LOG.info("Replica Cache file: "+  replicaFile.getPath() + 
+      LOG.info("Replica Cache file: "+  replicaFile.getPath() +
           " doesn't exist ");
       return false;
     }
     long fileLastModifiedTime = replicaFile.lastModified();
     if (System.currentTimeMillis() > fileLastModifiedTime + 
replicaCacheExpiry) {
-      LOG.info("Replica Cache file: " + replicaFile.getPath() + 
+      LOG.info("Replica Cache file: " + replicaFile.getPath() +
           " has gone stale");
       // Just to make findbugs happy
       if (!replicaFile.delete()) {
-        LOG.info("Replica Cache file: " + replicaFile.getPath() + 
+        LOG.info("Replica Cache file: " + replicaFile.getPath() +
             " cannot be deleted");
       }
       return false;
@@ -782,7 +795,7 @@ class BlockPoolSlice {
         iter.remove();
         volumeMap.add(bpid, info);
       }
-      LOG.info("Successfully read replica from cache file : " 
+      LOG.info("Successfully read replica from cache file : "
           + replicaFile.getPath());
       return true;
     } catch (Exception e) {
@@ -800,10 +813,10 @@ class BlockPoolSlice {
       // close the inputStream
       IOUtils.closeStream(inputStream);
     }
-  } 
-  
+  }
+
   private void saveReplicas(BlockListAsLongs blocksListToPersist) {
-    if (blocksListToPersist == null || 
+    if (blocksListToPersist == null ||
         blocksListToPersist.getNumberOfBlocks()== 0) {
       return;
     }
@@ -819,7 +832,7 @@ class BlockPoolSlice {
           replicaCacheFile.getPath());
       return;
     }
-    
+
     FileOutputStream out = null;
     try {
       out = new FileOutputStream(tmpFile);
@@ -833,7 +846,7 @@ class BlockPoolSlice {
       // and continue.
       LOG.warn("Failed to write replicas to cache ", e);
       if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
-        LOG.warn("Failed to delete replicas file: " + 
+        LOG.warn("Failed to delete replicas file: " +
             replicaCacheFile.getPath());
       }
     } finally {

Reply via email to