HDFS-7062. Archival Storage: skip under construction block for migration. 
Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2689b6ca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2689b6ca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2689b6ca

Branch: refs/heads/HDFS-6581
Commit: 2689b6ca727fff8a13347b811eb4cf79b9d30f48
Parents: dba52ce
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Sep 15 10:16:56 2014 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Sep 15 10:16:56 2014 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/server/mover/Mover.java  | 10 ++-
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java  |  4 +-
 .../hdfs/server/mover/TestStorageMover.java     | 77 ++++++++++++++++++++
 3 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2689b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 96588ff..e336ebc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -321,7 +321,14 @@ public class Mover {
 
       final LocatedBlocks locatedBlocks = status.getBlockLocations();
       boolean hasRemaining = false;
-      for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+      final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
+      List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
+      for(int i = 0; i < lbs.size(); i++) {
+        if (i == lbs.size() - 1 && !lastBlkComplete) {
+          // last block is incomplete, skip it
+          continue;
+        }
+        LocatedBlock lb = lbs.get(i);
         final StorageTypeDiff diff = new StorageTypeDiff(types,
             lb.getStorageTypes());
         if (!diff.removeOverlap()) {
@@ -472,6 +479,7 @@ public class Mover {
           final ExitStatus r = m.run();
 
           if (r == ExitStatus.SUCCESS) {
+            IOUtils.cleanup(LOG, nnc);
             iter.remove();
           } else if (r != ExitStatus.IN_PROGRESS) {
             // must be an error statue, return

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2689b6ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 10012c6..556eca6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -387,8 +387,8 @@ public class DFSAdmin extends FsShell {
     "\t[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]\n" +
     "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
     "\t[-metasave filename]\n" +
-    "\t[-setStoragePolicy path policyName\n" +
-    "\t[-getStoragePolicy path\n" +
+    "\t[-setStoragePolicy path policyName]\n" +
+    "\t[-getStoragePolicy path]\n" +
     "\t[-help [cmd]]\n";
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2689b6ca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
index d5d5cab..ceedfc2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
@@ -30,9 +30,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -44,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
@@ -490,6 +495,78 @@ public class TestStorageMover {
   }
 
   /**
+   * Print a big banner in the test log to make debug easier.
+   */
+  static void banner(String string) {
+    LOG.info("\n\n\n\n================================================\n" +
+        string + "\n" +
+        "==================================================\n\n");
+  }
+
+  /**
+   * Move an open file into archival storage
+   */
+  @Test
+  public void testMigrateOpenFileToArchival() throws Exception {
+    LOG.info("testMigrateOpenFileToArchival");
+    final Path fooDir = new Path("/foo");
+    Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
+    policyMap.put(fooDir, COLD);
+    NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(fooDir), null,
+        BLOCK_SIZE, null, policyMap);
+    ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
+        NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
+    MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
+    test.setupCluster();
+
+    // create an open file
+    banner("writing to file /foo/bar");
+    final Path barFile = new Path(fooDir, "bar");
+    DFSTestUtil.createFile(test.dfs, barFile, BLOCK_SIZE, (short) 1, 0L);
+    FSDataOutputStream out = test.dfs.append(barFile);
+    out.writeBytes("hello, ");
+    ((DFSOutputStream) out.getWrappedStream()).hsync();
+
+    try {
+      banner("start data migration");
+      test.setStoragePolicy(); // set /foo to COLD
+      test.migrate();
+
+      // make sure the under construction block has not been migrated
+      LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
+          barFile.toString(), BLOCK_SIZE);
+      LOG.info("Locations: " + lbs);
+      List<LocatedBlock> blks = lbs.getLocatedBlocks();
+      Assert.assertEquals(1, blks.size());
+      Assert.assertEquals(1, blks.get(0).getLocations().length);
+
+      banner("finish the migration, continue writing");
+      // make sure the writing can continue
+      out.writeBytes("world!");
+      ((DFSOutputStream) out.getWrappedStream()).hsync();
+      IOUtils.cleanup(LOG, out);
+
+      lbs = test.dfs.getClient().getLocatedBlocks(
+          barFile.toString(), BLOCK_SIZE);
+      LOG.info("Locations: " + lbs);
+      blks = lbs.getLocatedBlocks();
+      Assert.assertEquals(1, blks.size());
+      Assert.assertEquals(1, blks.get(0).getLocations().length);
+
+      banner("finish writing, starting reading");
+      // check the content of /foo/bar
+      FSDataInputStream in = test.dfs.open(barFile);
+      byte[] buf = new byte[13];
+      // read from offset 1024
+      in.readFully(BLOCK_SIZE, buf, 0, buf.length);
+      IOUtils.cleanup(LOG, in);
+      Assert.assertEquals("hello, world!", new String(buf));
+    } finally {
+      test.shutdownCluster();
+    }
+  }
+
+  /**
    * Test directories with Hot, Warm and Cold polices.
    */
   @Test

Reply via email to