Author: szetszwo Date: Sat Feb 1 09:01:41 2014 New Revision: 1563385 URL: http://svn.apache.org/r1563385 Log: Merge r1555021 through r1563384 from trunk.
Added: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java - copied unchanged from r1563384, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/ - copied from r1563384, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java - copied unchanged from r1563384, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java - copied unchanged from r1563384, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java - copied unchanged from r1563384, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java - copied unchanged from r1563384, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java Removed: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/ (props changed) hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed) hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Propchange: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1563042-1563384 Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java Sat Feb 1 09:01:41 2014 @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMa import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -163,8 +164,9 @@ class DFSClientCache { return new CacheLoader<String, DFSClient>() { @Override public DFSClient load(String userName) throws Exception { - UserGroupInformation ugi = UserGroupInformation - .createRemoteUser(userName); + UserGroupInformation ugi = getUserGroupInformation( + userName, + UserGroupInformation.getCurrentUser()); // Guava requires CacheLoader never returns null. return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() { @@ -177,6 +179,28 @@ class DFSClientCache { }; } + /** + * This method uses the currentUser, and real user to create a proxy + * @param effectiveUser The user who is being proxied by the real user + * @param realUser The actual user who does the command + * @return Proxy UserGroupInformation + * @throws IOException If proxying fails + */ + UserGroupInformation getUserGroupInformation( + String effectiveUser, + UserGroupInformation realUser) + throws IOException { + Preconditions.checkNotNull(effectiveUser); + Preconditions.checkNotNull(realUser); + UserGroupInformation ugi = + UserGroupInformation.createProxyUser(effectiveUser, realUser); + if (LOG.isDebugEnabled()){ + LOG.debug(String.format("Created ugi:" + + " %s for username: %s", ugi, effectiveUser)); + } + return ugi; + } + private RemovalListener<String, DFSClient> clientRemovalListener() { return new RemovalListener<String, DFSClient>() { @Override Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Sat Feb 1 09:01:41 2014 @@ -479,9 +479,9 @@ public class RpcProgramNfs3 extends RpcP } try { - // Use superUserClient to get file attr since we don't know whether the - // NFS client user has access permission to the file - attrs = writeManager.getFileAttr(superUserClient, handle, iug); + // HDFS-5804 removed supserUserClient access + attrs = writeManager.getFileAttr(dfsClient, handle, iug); + if (attrs == null) { LOG.error("Can't get path for fileId:" + handle.getFileId()); return new ACCESS3Response(Nfs3Status.NFS3ERR_STALE); @@ -603,8 +603,10 @@ public class RpcProgramNfs3 extends RpcP // Only do access check. try { // Don't read from cache. Client may not have read permission. - attrs = Nfs3Utils.getFileAttr(superUserClient, - Nfs3Utils.getFileIdPath(handle), iug); + attrs = Nfs3Utils.getFileAttr( + dfsClient, + Nfs3Utils.getFileIdPath(handle), + iug); } catch (IOException e) { if (LOG.isDebugEnabled()) { LOG.debug("Get error accessing file, fileId:" + handle.getFileId()); Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java Sat Feb 1 09:01:41 2014 @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTru import java.io.IOException; import java.net.InetAddress; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -40,6 +41,9 @@ import org.apache.hadoop.nfs.nfs3.respon import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response.EntryPlus3; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.SecurityHandler; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.util.StringUtils; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -58,9 +62,17 @@ public class TestReaddir { static RpcProgramNfs3 nfsd; static String testdir = "/tmp"; static SecurityHandler securityHandler; - + @BeforeClass public static void setup() throws Exception { + String currentUser = System.getProperty("user.name"); + config.set( + ProxyUsers.getProxySuperuserGroupConfKey(currentUser), + "*"); + config.set( + ProxyUsers.getProxySuperuserIpConfKey(currentUser), + "*"); + ProxyUsers.refreshSuperUserGroupsConfiguration(config); cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); hdfs = cluster.getFileSystem(); Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java Sat Feb 1 09:01:41 2014 @@ -20,12 +20,15 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThat; +import static org.hamcrest.core.Is.is; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; public class TestDFSClientCache { @@ -49,6 +52,28 @@ public class TestDFSClientCache { assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size()); } + @Test + public void testGetUserGroupInformation() throws IOException { + String userName = "user1"; + String currentUser = "currentUser"; + + UserGroupInformation currentUserUgi = UserGroupInformation + .createUserForTesting(currentUser, new String[0]); + currentUserUgi.setAuthenticationMethod( + UserGroupInformation.AuthenticationMethod.KERBEROS); + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost"); + DFSClientCache cache = new DFSClientCache(conf); + UserGroupInformation ugiResult + = cache.getUserGroupInformation(userName, currentUserUgi); + + assertThat(ugiResult.getUserName(), is(userName)); + assertThat(ugiResult.getRealUser(), is(currentUserUgi)); + assertThat( + ugiResult.getAuthenticationMethod(), + is(UserGroupInformation.AuthenticationMethod.PROXY)); + } + private static boolean isDfsClientClose(DFSClient c) { try { c.exists(""); Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Sat Feb 1 09:01:41 2014 @@ -50,6 +50,7 @@ import org.apache.hadoop.nfs.nfs3.respon import org.apache.hadoop.nfs.nfs3.response.READ3Response; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.SecurityHandler; +import org.apache.hadoop.security.authorize.ProxyUsers; import org.jboss.netty.channel.Channel; import org.junit.Assert; import org.junit.Test; @@ -285,6 +286,14 @@ public class TestWrites { SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class); Mockito.when(securityHandler.getUser()).thenReturn( System.getProperty("user.name")); + String currentUser = System.getProperty("user.name"); + config.set( + ProxyUsers.getProxySuperuserGroupConfKey(currentUser), + "*"); + config.set( + ProxyUsers.getProxySuperuserIpConfKey(currentUser), + "*"); + ProxyUsers.refreshSuperUserGroupsConfiguration(config); try { cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Feb 1 09:01:41 2014 @@ -297,6 +297,17 @@ Release 2.4.0 - UNRELEASED HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and the corresponding byte value. (jing9) + HDFS-5153. Datanode should send block reports for each storage in a + separate message. (Arpit Agarwal) + + HDFS-5804. HDFS NFS Gateway fails to mount and proxy when using Kerberos. + (Abin Shahab via jing9) + + HDFS-5859. DataNode#checkBlockToken should check block tokens even if + security is not enabled. (cmccabe) + + HDFS-5746. Add ShortCircuitSharedMemorySegment (cmccabe) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery @@ -310,6 +321,9 @@ Release 2.4.0 - UNRELEASED HDFS-5843. DFSClient.getFileChecksum() throws IOException if checksum is disabled. (Laurent Goujon via jing9) + HDFS-5856. DataNode.checkDiskError might throw NPE. + (Josh Elser via suresh) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1563042-1563384 Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Sat Feb 1 09:01:41 2014 @@ -399,6 +399,8 @@ public class DFSConfigKeys extends Commo public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 60 * 60 * 1000; public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay"; public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0; + public static final String DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold"; + public static final long DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000; public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec"; public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000; public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit"; Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Sat Feb 1 09:01:41 2014 @@ -1635,15 +1635,19 @@ public class BlockManager { /** * The given storage is reporting all its blocks. * Update the (storage-->block list) and (block-->storage list) maps. + * + * @return true if all known storages of the given DN have finished reporting. + * @throws IOException */ - public void processReport(final DatanodeID nodeID, + public boolean processReport(final DatanodeID nodeID, final DatanodeStorage storage, final String poolId, final BlockListAsLongs newReport) throws IOException { namesystem.writeLock(); final long startTime = Time.now(); //after acquiring write lock final long endTime; + DatanodeDescriptor node; try { - final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); + node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isAlive) { throw new IOException( "ProcessReport from dead or unregistered node: " + nodeID); @@ -1651,13 +1655,21 @@ public class BlockManager { // To minimize startup time, we discard any second (or later) block reports // that we receive while still in startup phase. - final DatanodeStorageInfo storageInfo = node.updateStorage(storage); + DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID()); + + if (storageInfo == null) { + // We handle this for backwards compatibility. + storageInfo = node.updateStorage(storage); + LOG.warn("Unknown storageId " + storage.getStorageID() + + ", updating storageMap. This indicates a buggy " + + "DataNode that isn't heartbeating correctly."); + } if (namesystem.isInStartupSafeMode() && storageInfo.getBlockReportCount() > 0) { blockLog.info("BLOCK* processReport: " + "discarded non-initial block report from " + nodeID + " because namenode still in startup phase"); - return; + return !node.hasStaleStorages(); } if (storageInfo.numBlocks() == 0) { @@ -1674,7 +1686,7 @@ public class BlockManager { storageInfo.receivedBlockReport(); if (staleBefore && !storageInfo.areBlockContentsStale()) { LOG.info("BLOCK* processReport: Received first block report from " - + node + " after starting up or becoming active. Its block " + + storage + " after starting up or becoming active. Its block " + "contents are no longer considered stale"); rescanPostponedMisreplicatedBlocks(); } @@ -1689,9 +1701,10 @@ public class BlockManager { if (metrics != null) { metrics.addBlockReport((int) (endTime - startTime)); } - blockLog.info("BLOCK* processReport: from " - + nodeID + ", blocks: " + newReport.getNumberOfBlocks() + blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID() + + " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks() + ", processing time: " + (endTime - startTime) + " msecs"); + return !node.hasStaleStorages(); } /** @@ -1846,7 +1859,7 @@ public class BlockManager { Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list Collection<StatefulBlockInfo> toUC) { // add to under-construction list - final DatanodeStorageInfo storageInfo = dn.updateStorage(storage); + final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID()); // place a delimiter in the list which separates blocks // that have been reported from those that have not Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Sat Feb 1 09:01:41 2014 @@ -257,6 +257,17 @@ public class DatanodeDescriptor extends } } + boolean hasStaleStorages() { + synchronized (storageMap) { + for (DatanodeStorageInfo storage : storageMap.values()) { + if (storage.areBlockContentsStale()) { + return true; + } + } + return false; + } + } + /** * Remove block from the list of blocks belonging to the data-node. Remove * data-node from the block. Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Sat Feb 1 09:01:41 2014 @@ -22,11 +22,9 @@ import static org.apache.hadoop.util.Tim import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; +import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -435,75 +433,100 @@ class BPServiceActor implements Runnable /** * Report the list blocks to the Namenode + * @return DatanodeCommands returned by the NN. May be null. * @throws IOException */ - DatanodeCommand blockReport() throws IOException { + List<DatanodeCommand> blockReport() throws IOException { // send block report if timer has expired. - DatanodeCommand cmd = null; - long startTime = now(); - if (startTime - lastBlockReport > dnConf.blockReportInterval) { + final long startTime = now(); + if (startTime - lastBlockReport <= dnConf.blockReportInterval) { + return null; + } + + ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(); - // Flush any block information that precedes the block report. Otherwise - // we have a chance that we will miss the delHint information - // or we will report an RBW replica after the BlockReport already reports - // a FINALIZED one. - reportReceivedDeletedBlocks(); - - // Send one block report per known storage. - - // Create block report - long brCreateStartTime = now(); - long totalBlockCount = 0; - - Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = - dn.getFSDataset().getBlockReports(bpos.getBlockPoolId()); - - // Send block report - long brSendStartTime = now(); - StorageBlockReport[] reports = - new StorageBlockReport[perVolumeBlockLists.size()]; - - int i = 0; - for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { - DatanodeStorage dnStorage = kvPair.getKey(); - BlockListAsLongs blockList = kvPair.getValue(); - totalBlockCount += blockList.getNumberOfBlocks(); - - reports[i++] = - new StorageBlockReport( - dnStorage, blockList.getBlockListAsLongs()); - } - - cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports); - - // Log the block report processing stats from Datanode perspective - long brSendCost = now() - brSendStartTime; - long brCreateCost = brSendStartTime - brCreateStartTime; - dn.getMetrics().addBlockReport(brSendCost); - LOG.info("BlockReport of " + totalBlockCount - + " blocks took " + brCreateCost + " msec to generate and " - + brSendCost + " msecs for RPC and NN processing"); - - // If we have sent the first block report, then wait a random - // time before we start the periodic block reports. - if (resetBlockReportTime) { - lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); - resetBlockReportTime = false; - } else { - /* say the last block report was at 8:20:14. The current report - * should have started around 9:20:14 (default 1 hour interval). - * If current time is : - * 1) normal like 9:20:18, next report should be at 10:20:14 - * 2) unexpected like 11:35:43, next report should be at 12:20:14 - */ - lastBlockReport += (now() - lastBlockReport) / - dnConf.blockReportInterval * dnConf.blockReportInterval; + // Flush any block information that precedes the block report. Otherwise + // we have a chance that we will miss the delHint information + // or we will report an RBW replica after the BlockReport already reports + // a FINALIZED one. + reportReceivedDeletedBlocks(); + lastDeletedReport = startTime; + + long brCreateStartTime = now(); + Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = + dn.getFSDataset().getBlockReports(bpos.getBlockPoolId()); + + // Convert the reports to the format expected by the NN. + int i = 0; + int totalBlockCount = 0; + StorageBlockReport reports[] = + new StorageBlockReport[perVolumeBlockLists.size()]; + + for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { + BlockListAsLongs blockList = kvPair.getValue(); + reports[i++] = new StorageBlockReport( + kvPair.getKey(), blockList.getBlockListAsLongs()); + totalBlockCount += blockList.getNumberOfBlocks(); + } + + // Send the reports to the NN. + int numReportsSent; + long brSendStartTime = now(); + if (totalBlockCount < dnConf.blockReportSplitThreshold) { + // Below split threshold, send all reports in a single message. + numReportsSent = 1; + DatanodeCommand cmd = + bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports); + if (cmd != null) { + cmds.add(cmd); + } + } else { + // Send one block report per message. + numReportsSent = i; + for (StorageBlockReport report : reports) { + StorageBlockReport singleReport[] = { report }; + DatanodeCommand cmd = bpNamenode.blockReport( + bpRegistration, bpos.getBlockPoolId(), singleReport); + if (cmd != null) { + cmds.add(cmd); + } } - LOG.info("sent block report, processed command:" + cmd); } - return cmd; + + // Log the block report processing stats from Datanode perspective + long brSendCost = now() - brSendStartTime; + long brCreateCost = brSendStartTime - brCreateStartTime; + dn.getMetrics().addBlockReport(brSendCost); + LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount + + " blocks total. Took " + brCreateCost + + " msec to generate and " + brSendCost + + " msecs for RPC and NN processing. " + + " Got back commands " + + (cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds))); + + scheduleNextBlockReport(startTime); + return cmds.size() == 0 ? null : cmds; + } + + private void scheduleNextBlockReport(long previousReportStartTime) { + // If we have sent the first set of block reports, then wait a random + // time before we start the periodic block reports. + if (resetBlockReportTime) { + lastBlockReport = previousReportStartTime - + DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval)); + resetBlockReportTime = false; + } else { + /* say the last block report was at 8:20:14. The current report + * should have started around 9:20:14 (default 1 hour interval). + * If current time is : + * 1) normal like 9:20:18, next report should be at 10:20:14 + * 2) unexpected like 11:35:43, next report should be at 12:20:14 + */ + lastBlockReport += (now() - lastBlockReport) / + dnConf.blockReportInterval * dnConf.blockReportInterval; + } } - + DatanodeCommand cacheReport() throws IOException { // If caching is disabled, do not send a cache report if (dn.getFSDataset().getCacheCapacity() == 0) { @@ -511,7 +534,7 @@ class BPServiceActor implements Runnable } // send cache report if timer has expired. DatanodeCommand cmd = null; - long startTime = Time.monotonicNow(); + final long startTime = Time.monotonicNow(); if (startTime - lastCacheReport > dnConf.cacheReportInterval) { if (LOG.isDebugEnabled()) { LOG.debug("Sending cacheReport from service actor: " + this); @@ -611,7 +634,7 @@ class BPServiceActor implements Runnable // while (shouldRun()) { try { - long startTime = now(); + final long startTime = now(); // // Every so often, send heartbeat or block-report @@ -657,10 +680,10 @@ class BPServiceActor implements Runnable lastDeletedReport = startTime; } - DatanodeCommand cmd = blockReport(); - processCommand(new DatanodeCommand[]{ cmd }); + List<DatanodeCommand> cmds = blockReport(); + processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); - cmd = cacheReport(); + DatanodeCommand cmd = cacheReport(); processCommand(new DatanodeCommand[]{ cmd }); // Now safe to start scanning the block pool. Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Sat Feb 1 09:01:41 2014 @@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; @@ -70,6 +72,7 @@ public class DNConf { final long readaheadLength; final long heartBeatInterval; final long blockReportInterval; + final long blockReportSplitThreshold; final long deleteReportInterval; final long initialBlockReportDelay; final long cacheReportInterval; @@ -117,6 +120,8 @@ public class DNConf { DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, + DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT); Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Feb 1 09:01:41 2014 @@ -36,6 +36,7 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.UnknownHostException; import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -51,7 +52,6 @@ import java.util.concurrent.atomic.Atomi import javax.management.ObjectName; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -1194,7 +1194,7 @@ public class DataNode extends Configured private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token, AccessMode accessMode) throws IOException { - if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) { + if (isBlockTokenEnabled) { BlockTokenIdentifier id = new BlockTokenIdentifier(); ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); @@ -1324,12 +1324,7 @@ public class DataNode extends Configured protected void checkDiskError(Exception e ) throws IOException { LOG.warn("checkDiskError: exception: ", e); - if (e instanceof SocketException || e instanceof SocketTimeoutException - || e instanceof ClosedByInterruptException - || e.getMessage().startsWith("An established connection was aborted") - || e.getMessage().startsWith("Broken pipe") - || e.getMessage().startsWith("Connection reset") - || e.getMessage().contains("java.nio.channels.SocketChannel")) { + if (isNetworkRelatedException(e)) { LOG.info("Not checking disk as checkDiskError was called on a network" + " related exception"); return; @@ -1343,6 +1338,28 @@ public class DataNode extends Configured } /** + * Check if the provided exception looks like it's from a network error + * @param e the exception from a checkDiskError call + * @return true if this exception is network related, false otherwise + */ + protected boolean isNetworkRelatedException(Exception e) { + if (e instanceof SocketException + || e instanceof SocketTimeoutException + || e instanceof ClosedChannelException + || e instanceof ClosedByInterruptException) { + return true; + } + + String msg = e.getMessage(); + + return null != msg + && (msg.startsWith("An established connection was aborted") + || msg.startsWith("Broken pipe") + || msg.startsWith("Connection reset") + || msg.contains("java.nio.channels.SocketChannel")); + } + + /** * Check if there is a disk failure and if so, handle the error */ public void checkDiskError() { Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Sat Feb 1 09:01:41 2014 @@ -998,13 +998,18 @@ class NameNodeRpcServer implements Namen + "from " + nodeReg + ", reports.length=" + reports.length); } final BlockManager bm = namesystem.getBlockManager(); + boolean hasStaleStorages = true; for(StorageBlockReport r : reports) { final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks()); - bm.processReport(nodeReg, r.getStorage(), poolId, blocks); + hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks); } - if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState()) + if (nn.getFSImage().isUpgradeFinalized() && + !nn.isStandbyState() && + !hasStaleStorages) { return new FinalizeCommand(poolId); + } + return null; } Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Sat Feb 1 09:01:41 2014 @@ -483,6 +483,20 @@ </property> <property> + <name>dfs.blockreport.split.threshold</name> + <value>1000000</value> + <description>If the number of blocks on the DataNode is below this + threshold then it will send block reports for all Storage Directories + in a single message. + + If the number of blocks exceeds this threshold then the DataNode will + send block reports for each Storage Directory in separate messages. + + Set to zero to always split. + </description> +</property> + +<property> <name>dfs.datanode.directoryscan.interval</name> <value>21600</value> <description>Interval in seconds for Datanode to scan data directories and Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1563385&r1=1563384&r2=1563385&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Sat Feb 1 09:01:41 2014 @@ -18,12 +18,16 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.DataOutputStream; import java.io.File; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -196,4 +200,16 @@ public class TestDiskError { } } } + + @Test + public void testNetworkErrorsIgnored() { + DataNode dn = cluster.getDataNodes().iterator().next(); + + assertTrue(dn.isNetworkRelatedException(new SocketException())); + assertTrue(dn.isNetworkRelatedException(new SocketTimeoutException())); + assertTrue(dn.isNetworkRelatedException(new ClosedChannelException())); + assertTrue(dn.isNetworkRelatedException(new Exception("Broken pipe foo bar"))); + assertFalse(dn.isNetworkRelatedException(new Exception())); + assertFalse(dn.isNetworkRelatedException(new Exception("random problem"))); + } }