Repository: hadoop
Updated Branches:
  refs/heads/branch-2 29fe5af01 -> f3cdf29af


HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f3cdf29a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f3cdf29a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f3cdf29a

Branch: refs/heads/branch-2
Commit: f3cdf29af4c67a1963f51f02bf88075bf6dce679
Parents: 29fe5af
Author: Wei-Chiu Chuang <weic...@apache.org>
Authored: Sat Feb 25 21:13:51 2017 -0800
Committer: Wei-Chiu Chuang <weic...@apache.org>
Committed: Sat Feb 25 21:13:51 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |   6 +-
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 119 ++++++++++++-------
 .../apache/hadoop/hdfs/TestDFSOutputStream.java |   3 +-
 3 files changed, 83 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3cdf29a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 1e549d9..404a644 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -606,9 +606,9 @@ public class DFSOutputStream extends FSOutputSummer
       // update the block length first time irrespective of flag
       if (updateLength || getStreamer().getPersistBlocks().get()) {
         synchronized (this) {
-          if (!getStreamer().streamerClosed()
-              && getStreamer().getBlock() != null) {
-            lastBlockLength = getStreamer().getBlock().getNumBytes();
+          final ExtendedBlock block = getStreamer().getBlock();
+          if (!getStreamer().streamerClosed() && block != null) {
+            lastBlockLength = block.getNumBytes();
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3cdf29a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index aa55a3e..bcf740f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -150,8 +150,6 @@ class DataStreamer extends Daemon {
 
     /**
      * Record a connection exception.
-     * @param e
-     * @throws InvalidEncryptionKeyException
      */
     void recordFailure(final InvalidEncryptionKeyException e)
         throws InvalidEncryptionKeyException {
@@ -186,9 +184,8 @@ class DataStreamer extends Daemon {
         final StorageType[] targetStorageTypes,
         final Token<BlockTokenIdentifier> blockToken) throws IOException {
       //send the TRANSFER_BLOCK request
-      new Sender(out)
-          .transferBlock(block, blockToken, dfsClient.clientName, targets,
-              targetStorageTypes);
+      new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
+          dfsClient.clientName, targets, targetStorageTypes);
       out.flush();
       //ack
       BlockOpResponseProto transferResponse = BlockOpResponseProto
@@ -207,6 +204,42 @@ class DataStreamer extends Daemon {
     }
   }
 
+  static class BlockToWrite {
+    private ExtendedBlock currentBlock;
+
+    BlockToWrite(ExtendedBlock block) {
+      setCurrentBlock(block);
+    }
+
+    synchronized ExtendedBlock getCurrentBlock() {
+      return currentBlock == null ? null : new ExtendedBlock(currentBlock);
+    }
+
+    synchronized long getNumBytes() {
+      return currentBlock == null ? 0 : currentBlock.getNumBytes();
+    }
+
+    synchronized void setCurrentBlock(ExtendedBlock block) {
+      currentBlock = (block == null || block.getLocalBlock() == null) ?
+          null : new ExtendedBlock(block);
+    }
+
+    synchronized void setNumBytes(long numBytes) {
+      assert currentBlock != null;
+      currentBlock.setNumBytes(numBytes);
+    }
+
+    synchronized void setGenerationStamp(long generationStamp) {
+      assert currentBlock != null;
+      currentBlock.setGenerationStamp(generationStamp);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return currentBlock == null ? "null" : currentBlock.toString();
+    }
+  }
+
   /**
    * Create a socket for a write pipeline
    *
@@ -420,7 +453,7 @@ class DataStreamer extends Daemon {
   }
 
   private volatile boolean streamerClosed = false;
-  private volatile ExtendedBlock block; // its length is number of bytes acked
+  private final BlockToWrite block; // its length is number of bytes acked
   private Token<BlockTokenIdentifier> accessToken;
   private DataOutputStream blockStream;
   private DataInputStream blockReplyStream;
@@ -481,12 +514,14 @@ class DataStreamer extends Daemon {
   private final String[] favoredNodes;
   private final EnumSet<AddBlockFlag> addBlockFlags;
 
-  private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
+  private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+                       DFSClient dfsClient, String src,
                        Progressable progress, DataChecksum checksum,
                        AtomicReference<CachingStrategy> cachingStrategy,
                        ByteArrayManager byteArrayManage,
                        boolean isAppend, String[] favoredNodes,
                        EnumSet<AddBlockFlag> flags) {
+    this.block = new BlockToWrite(block);
     this.dfsClient = dfsClient;
     this.src = src;
     this.progress = progress;
@@ -512,9 +547,8 @@ class DataStreamer extends Daemon {
                AtomicReference<CachingStrategy> cachingStrategy,
                ByteArrayManager byteArrayManage, String[] favoredNodes,
                EnumSet<AddBlockFlag> flags) {
-    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+    this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
         byteArrayManage, false, favoredNodes, flags);
-    this.block = block;
     stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
   }
 
@@ -527,10 +561,9 @@ class DataStreamer extends Daemon {
                String src, Progressable progress, DataChecksum checksum,
                AtomicReference<CachingStrategy> cachingStrategy,
                ByteArrayManager byteArrayManage) throws IOException {
-    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
-        byteArrayManage, true, null, null);
+    this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum,
+        cachingStrategy, byteArrayManage, true, null, null);
     stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
-    block = lastBlock.getBlock();
     bytesSent = block.getNumBytes();
     accessToken = lastBlock.getBlockToken();
   }
@@ -1295,7 +1328,7 @@ class DataStreamer extends Daemon {
       LocatedBlock lb;
       //get a new datanode
       lb = dfsClient.namenode.getAdditionalDatanode(
-          src, stat.getFileId(), block, nodes, storageIDs,
+          src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs,
           exclude.toArray(new DatanodeInfo[exclude.size()]),
           1, dfsClient.clientName);
       // a new node was allocated by the namenode. Update nodes.
@@ -1407,7 +1440,7 @@ class DataStreamer extends Daemon {
     } // while
 
     if (success) {
-      block = updatePipeline(newGS);
+      updatePipeline(newGS);
     }
     return false; // do not sleep, continue processing
   }
@@ -1504,17 +1537,27 @@ class DataStreamer extends Daemon {
   }
 
   private LocatedBlock updateBlockForPipeline() throws IOException {
-    return dfsClient.namenode.updateBlockForPipeline(block,
+    return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(),
         dfsClient.clientName);
   }
 
+  void updateBlockGS(final long newGS) {
+    block.setGenerationStamp(newGS);
+  }
+
   /** update pipeline at the namenode */
-  ExtendedBlock updatePipeline(long newGS) throws IOException {
-    final ExtendedBlock newBlock = new ExtendedBlock(
-        block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), 
newGS);
-    dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
-        nodes, storageIDs);
-    return newBlock;
+  private void updatePipeline(long newGS) throws IOException {
+    final ExtendedBlock oldBlock = block.getCurrentBlock();
+    // the new GS has been propagated to all DN, it should be ok to update the
+    // local block state
+    updateBlockGS(newGS);
+    dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
+        block.getCurrentBlock(), nodes, storageIDs);
+  }
+
+  DatanodeInfo[] getExcludedNodes() {
+    return excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
+        .keySet().toArray(new DatanodeInfo[0]);
   }
 
   /**
@@ -1529,35 +1572,30 @@ class DataStreamer extends Daemon {
     StorageType[] storageTypes;
     int count = dfsClient.getConf().getNumBlockWriteRetry();
     boolean success;
-    ExtendedBlock oldBlock = block;
+    final ExtendedBlock oldBlock = block.getCurrentBlock();
     do {
       errorState.reset();
       lastException.clear();
       success = false;
 
-      DatanodeInfo[] excluded =
-          excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
-              .keySet()
-              .toArray(new DatanodeInfo[0]);
-      block = oldBlock;
-      lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
-      block = lb.getBlock();
+      DatanodeInfo[] excluded = getExcludedNodes();
+      lb = locateFollowingBlock(
+          excluded.length > 0 ? excluded : null, oldBlock);
+      block.setCurrentBlock(lb.getBlock());
       block.setNumBytes(0);
       bytesSent = 0;
       accessToken = lb.getBlockToken();
       nodes = lb.getLocations();
       storageTypes = lb.getStorageTypes();
 
-      //
       // Connect to first DataNode in the list.
-      //
       success = createBlockOutputStream(nodes, storageTypes, 0L, false);
 
       if (!success) {
         LOG.warn("Abandoning " + block);
-        dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
-            dfsClient.clientName);
-        block = null;
+        dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
+            stat.getFileId(), src, dfsClient.clientName);
+        block.setCurrentBlock(null);
         final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
         LOG.warn("Excluding datanode " + badNode);
         excludedNodes.put(badNode, badNode);
@@ -1618,7 +1656,7 @@ class DataStreamer extends Daemon {
 
         // We cannot change the block length in 'block' as it counts the number
         // of bytes ack'ed.
-        ExtendedBlock blockCopy = new ExtendedBlock(block);
+        ExtendedBlock blockCopy = block.getCurrentBlock();
         blockCopy.setNumBytes(stat.getBlockSize());
 
         boolean[] targetPinnings = getPinnings(nodes);
@@ -1728,8 +1766,8 @@ class DataStreamer extends Daemon {
     }
   }
 
-  protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
-      throws IOException {
+  protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
+      ExtendedBlock oldBlock) throws IOException {
     final DfsClientConf conf = dfsClient.getConf(); 
     int retries = conf.getNumBlockWriteLocateFollowingRetry();
     long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
@@ -1738,7 +1776,7 @@ class DataStreamer extends Daemon {
       while (true) {
         try {
           return dfsClient.namenode.addBlock(src, dfsClient.clientName,
-              block, excludedNodes, stat.getFileId(), favoredNodes,
+              oldBlock, excluded, stat.getFileId(), favoredNodes,
               addBlockFlags);
         } catch (RemoteException e) {
           IOException ue =
@@ -1823,7 +1861,7 @@ class DataStreamer extends Daemon {
    * @return the block this streamer is writing to
    */
   ExtendedBlock getBlock() {
-    return block;
+    return block.getCurrentBlock();
   }
 
   /**
@@ -2016,7 +2054,8 @@ class DataStreamer extends Daemon {
 
   @Override
   public String toString() {
-    return  (block == null? null: block.getLocalBlock())
+    final ExtendedBlock extendedBlock = block.getCurrentBlock();
+    return  (extendedBlock == null ? null : extendedBlock.getLocalBlock())
         + "@" + Arrays.toString(getNodes());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3cdf29a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 750103d..9ec01b6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -110,8 +110,7 @@ public class TestDFSOutputStream {
    * packet size < 64kB. See HDFS-7308 for details.
    */
   @Test
-  public void testComputePacketChunkSize()
-      throws Exception {
+  public void testComputePacketChunkSize() throws Exception {
     DistributedFileSystem fs = cluster.getFileSystem();
     FSDataOutputStream os = fs.create(new Path("/test"));
     DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to