Repository: hadoop
Updated Branches:
refs/heads/branch-2.6 b9a6f9aa1 -> 238458b25
HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in
commitBlock. Contributed by Chang Li.
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
Change-Id: Ibd44ff1bf92bad7262db724990a6a64c1975ffb6
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/238458b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/238458b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/238458b2
Branch: refs/heads/branch-2.6
Commit: 238458b25921a652eefead2cebd797c1b9de0343
Parents: b9a6f9a
Author: Kihwal Lee <[email protected]>
Authored: Wed Nov 4 12:10:59 2015 -0600
Committer: Zhe Zhang <[email protected]>
Committed: Tue Nov 24 09:44:50 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 2 +-
.../BlockInfoUnderConstruction.java | 2 +-
.../server/blockmanagement/BlockManager.java | 4 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 67 +++++++++++++
.../TestCommitBlockWithInvalidGenStamp.java | 98 ++++++++++++++++++++
6 files changed, 174 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cc7bae8..5e683e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -20,6 +20,9 @@ Release 2.6.3 - UNRELEASED
HDFS-9083. Replication violates block placement policy (Rushabh Shah)
+ HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in
+ commitBlock. (Chang Li via zhz)
+
Release 2.6.2 - 2015-10-28
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 92dbc8e..21e4d4e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -363,7 +363,7 @@ public class DFSOutputStream extends FSOutputSummer
//
class DataStreamer extends Daemon {
private volatile boolean streamerClosed = false;
- private ExtendedBlock block; // its length is number of bytes acked
+ private volatile ExtendedBlock block; // its length is number of bytes
acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
index dd3593f..703373e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
@@ -262,7 +262,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
throw new IOException("Trying to commit inconsistent block: id = "
+ block.getBlockId() + ", expected id = " + getBlockId());
blockUCState = BlockUCState.COMMITTED;
- this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+ this.setNumBytes(block.getNumBytes());
// Sort out invalid replicas.
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1febc53..feaf843 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -591,6 +591,10 @@ public class BlockManager {
assert block.getNumBytes() <= commitBlock.getNumBytes() :
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
+ if(block.getGenerationStamp() != commitBlock.getGenerationStamp()) {
+ throw new IOException("Commit block with mismatching GS. NN has " +
+ block + ", client submits " + commitBlock);
+ }
block.commitBlock(commitBlock);
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index c728b2b..c012f67 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -63,11 +63,15 @@ import
org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha
.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
@@ -1636,4 +1640,67 @@ public class DFSTestUtil {
LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES,
new LayoutVersion.LayoutFeature[] { feature });
}
+
+ public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
+ Block block, BlockStatus blockStatus, DatanodeStorage storage) {
+ ReceivedDeletedBlockInfo[] receivedBlocks = new
ReceivedDeletedBlockInfo[1];
+ receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
+ StorageReceivedDeletedBlocks[] reports = new
StorageReceivedDeletedBlocks[1];
+ reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
+ return reports;
+ }
+
+ /**
+ * Adds a block to a file.
+ * This method only manipulates NameNode
+ * states of the file and the block without injecting data to DataNode.
+ * It does mimic block reports.
+ * You should disable periodical heartbeat before use this.
+ * @param dataNodes List DataNodes to host the block
+ * @param previous Previous block in the file
+ * @param len block size
+ * @return The added block
+ */
+ public static Block addBlockToFile(
+ List<DataNode> dataNodes, DistributedFileSystem fs, FSNamesystem ns,
+ String file, INodeFile fileNode,
+ String clientName, ExtendedBlock previous, int len)
+ throws Exception {
+ fs.getClient().namenode.addBlock(file, clientName, previous, null,
+ fileNode.getId(), null);
+
+ final BlockInfo lastBlock =
+ fileNode.getLastBlock();
+ final int groupSize = fileNode.getBlockReplication();
+ assert dataNodes.size() >= groupSize;
+ // 1. RECEIVING_BLOCK IBR
+ for (int i = 0; i < groupSize; i++) {
+ DataNode dn = dataNodes.get(i);
+ final Block block = new Block(lastBlock.getBlockId() + i, 0,
+ lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new
DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+
+ // 2. RECEIVED_BLOCK IBR
+ for (int i = 0; i < groupSize; i++) {
+ DataNode dn = dataNodes.get(i);
+ final Block block = new Block(lastBlock.getBlockId() + i,
+ len, lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new
DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+ lastBlock.setNumBytes(len);
+ return lastBlock;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/238458b2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
new file mode 100644
index 0000000..5f8abc5
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+public class TestCommitBlockWithInvalidGenStamp {
+ private static final int BLOCK_SIZE = 1024;
+ private MiniDFSCluster cluster;
+ private FSDirectory dir;
+ private DistributedFileSystem dfs;
+
+ @Before
+ public void setUp() throws IOException {
+ final Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+
+ dir = cluster.getNamesystem().getFSDirectory();
+ dfs = cluster.getFileSystem();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testCommitWithInvalidGenStamp() throws Exception {
+ final Path file = new Path("/file");
+ FSDataOutputStream out = null;
+
+ try {
+ out = dfs.create(file, (short) 1);
+ INodeFile fileNode = dir.getINode4Write(file.toString()).asFile();
+ ExtendedBlock previous = null;
+
+ Block newBlock = DFSTestUtil.addBlockToFile(cluster.getDataNodes(),
+ dfs, cluster.getNamesystem(), file.toString(), fileNode,
+ dfs.getClient().getClientName(), previous, 100);
+ Block newBlockClone = new Block(newBlock);
+ previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+ newBlockClone);
+
+ previous.setGenerationStamp(123);
+ try{
+ dfs.getClient().getNamenode().complete(file.toString(),
+ dfs.getClient().getClientName(), previous, fileNode.getId());
+ Assert.fail("should throw exception because invalid genStamp");
+ } catch (IOException e) {
+ Assert.assertTrue(e.toString().contains(
+ "Commit block with mismatching GS. NN has " +
+ newBlock + ", client submits " + newBlockClone));
+ }
+ previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+ newBlock);
+ boolean complete =
dfs.getClient().getNamenode().complete(file.toString(),
+ dfs.getClient().getClientName(), previous, fileNode.getId());
+ Assert.assertTrue("should complete successfully", complete);
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+ }
+}