Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Apr 23 21:37:55 2012 @@ -21,10 +21,13 @@ package org.apache.hadoop.hdfs.server.da import static org.apache.hadoop.hdfs.server.common.Util.now; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; @@ -49,6 +52,9 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import javax.management.MBeanServer; +import javax.management.ObjectName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -64,25 +70,27 @@ import org.apache.hadoop.hdfs.HDFSPolicy import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; -import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Util; -import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; @@ -92,6 +100,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.StreamFile; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -101,7 +110,6 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; @@ -109,27 +117,24 @@ import org.apache.hadoop.ipc.RemoteExcep import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionInfo; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; -import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.mortbay.util.ajax.JSON; -import java.lang.management.ManagementFactory; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - /********************************************************** * DataNode is a class (and program) that stores a set of * blocks for a DFS deployment. A single deployment can @@ -230,6 +235,8 @@ public class DataNode extends Configured BlockTokenSecretManager blockTokenSecretManager; boolean isBlockTokenInitialized = false; + final String userWithLocalPathAccess; + public DataBlockScanner blockScanner = null; public Daemon blockScannerThread = null; @@ -276,6 +283,9 @@ public class DataNode extends Configured DataNode.setDataNode(this); + this.userWithLocalPathAccess = conf + .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY); + try { startDataNode(conf, dataDirs, namenode, resources); } catch (IOException ie) { @@ -1753,6 +1763,67 @@ public class DataNode extends Configured + ": " + protocol); } + /** Ensure the authentication method is kerberos */ + private void checkKerberosAuthMethod(String msg) throws IOException { + // User invoking the call must be same as the datanode user + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() != + AuthenticationMethod.KERBEROS) { + throw new AccessControlException("Error in "+msg+". Only " + + "kerberos based authentication is allowed."); + } + } + + private void checkBlockLocalPathAccess() throws IOException { + checkKerberosAuthMethod("getBlockLocalPathInfo()"); + String currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + if (!currentUser.equals(this.userWithLocalPathAccess)) { + throw new AccessControlException( + "Can't continue with getBlockLocalPathInfo() " + + "authorization. The user " + currentUser + + " is not allowed to call getBlockLocalPathInfo"); + } + } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(Block block, + Token<BlockTokenIdentifier> token) throws IOException { + checkBlockLocalPathAccess(); + checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); + BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); + if (LOG.isDebugEnabled()) { + if (info != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("getBlockLocalPathInfo successful block=" + block + + " blockfile " + info.getBlockPath() + " metafile " + + info.getMetaPath()); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("getBlockLocalPathInfo for block=" + block + + " returning null"); + } + } + } + return info; + } + + private void checkBlockToken(Block block, Token<BlockTokenIdentifier> token, + AccessMode accessMode) throws IOException { + if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) { + BlockTokenIdentifier id = new BlockTokenIdentifier(); + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + id.readFields(in); + if (LOG.isDebugEnabled()) { + LOG.debug("Got: " + id.toString()); + } + blockTokenSecretManager.checkAccess(id, null, block, accessMode); + } + } + /** A convenient class used in block recovery */ static class BlockRecord { final DatanodeID id;
Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Apr 23 21:37:55 2012 @@ -47,23 +47,24 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; -import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics.util.MBeanUtil; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.hdfs.server.common.GenerationStamp; -import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; -import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.StringUtils; /************************************************** * FSDataset manages a set of data blocks. Each block @@ -953,6 +954,17 @@ public class FSDataset implements FSCons } @Override // FSDatasetInterface + public BlockLocalPathInfo getBlockLocalPathInfo(Block block) + throws IOException { + File datafile = getBlockFile(block); + File metafile = getMetaFile(datafile, block); + BlockLocalPathInfo info = new BlockLocalPathInfo(block, + datafile.getAbsolutePath(), metafile.getAbsolutePath()); + return info; + } + + + @Override // FSDatasetInterface public synchronized InputStream getBlockInputStream(Block b) throws IOException { return new FileInputStream(getBlockFile(b)); } Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Mon Apr 23 21:37:55 2012 @@ -25,11 +25,12 @@ import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; -import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; -import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -343,4 +344,9 @@ public interface FSDatasetInterface exte Block oldBlock, long recoveryId, long newLength) throws IOException; + + /** + * Get {@link BlockLocalPathInfo} for the given block. + **/ + public BlockLocalPathInfo getBlockLocalPathInfo(Block b) throws IOException; } Modified: hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Apr 23 21:37:55 2012 @@ -38,6 +38,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.RemoteBlockReader; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; @@ -502,8 +503,8 @@ public class NamenodeFsck { s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); - String file = BlockReader.getFileName(targetAddr, block.getBlockId()); - blockReader = BlockReader.newBlockReader(s, file, block, lblock + String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId()); + blockReader = RemoteBlockReader.newBlockReader(s, file, block, lblock .getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096)); } catch (IOException ex) { Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/commit-tests Mon Apr 23 21:37:55 2012 @@ -11,6 +11,7 @@ **/TestDatanodeDescriptor.java **/TestEditLog.java **/TestFileLimit.java +**/TestShortCircuitLocalRead.java **/TestHeartbeatHandling.java **/TestHost2NodesMap.java **/TestNamenodeCapacityReport.java Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Mon Apr 23 21:37:55 2012 @@ -132,7 +132,7 @@ public class BlockReaderTestUtil { /** * Get a BlockReader for the given block. */ - public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) + public RemoteBlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) throws IOException { InetSocketAddress targetAddr = null; Socket sock = null; @@ -143,7 +143,7 @@ public class BlockReaderTestUtil { sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT); sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); - return BlockReader.newBlockReader( + return RemoteBlockReader.newBlockReader( sock, targetAddr.toString()+ ":" + block.getBlockId(), block, testBlock.getBlockToken(), offset, lenToRead, Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Mon Apr 23 21:37:55 2012 @@ -54,7 +54,7 @@ public class TestClientBlockVerification */ @Test public void testBlockVerification() throws Exception { - BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); + RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); reader.close(); @@ -65,7 +65,7 @@ public class TestClientBlockVerification */ @Test public void testIncompleteRead() throws Exception { - BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); + RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false); // We asked the blockreader for the whole file, and only read @@ -82,7 +82,7 @@ public class TestClientBlockVerification @Test public void testCompletePartialRead() throws Exception { // Ask for half the file - BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); + RemoteBlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); @@ -101,7 +101,7 @@ public class TestClientBlockVerification for (int length : lengths) { DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " + " len=" + length); - BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length)); + RemoteBlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); reader.close(); Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java Mon Apr 23 21:37:55 2012 @@ -71,13 +71,13 @@ public class TestConnCache { * It verifies that all invocation to DFSInputStream.getBlockReader() * use the same socket. */ - private class MockGetBlockReader implements Answer<BlockReader> { - public BlockReader reader = null; + private class MockGetBlockReader implements Answer<RemoteBlockReader> { + public RemoteBlockReader reader = null; private Socket sock = null; - public BlockReader answer(InvocationOnMock invocation) throws Throwable { - BlockReader prevReader = reader; - reader = (BlockReader) invocation.callRealMethod(); + public RemoteBlockReader answer(InvocationOnMock invocation) throws Throwable { + RemoteBlockReader prevReader = reader; + reader = (RemoteBlockReader) invocation.callRealMethod(); if (sock == null) { sock = reader.dnSock; } else if (prevReader != null && prevReader.hasSentStatusCode()) { Added: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1329468&view=auto ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (added) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Mon Apr 23 21:37:55 2012 @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertTrue; + +import java.io.EOFException; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + + +/** + * Test for short circuit read functionality using {@link BlockReaderLocal}. + * When a block is being read by a client is on the local datanode, instead of + * using {@link DataTransferProtocol} and connect to datanode, the short circuit + * read allows reading the file directly from the files on the local file + * system. + */ +public class TestShortCircuitLocalRead { + static final String DIR = MiniDFSCluster.getBaseDirectory() + TestShortCircuitLocalRead.class.getSimpleName() + "/"; + + static final long SEED = 0xDEADBEEFL; + static final int BLOCKSIZE = 5120; + boolean simulatedStorage = false; + + // creates a file but does not close it + static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) + throws IOException { + FSDataOutputStream stm = fileSys.create(name, true, + fileSys.getConf().getInt("io.file.buffer.size", 4096), + (short)repl, (long)BLOCKSIZE); + return stm; + } + + static private void checkData(byte[] actual, int from, byte[] expected, + String message) { + checkData(actual, from, expected, actual.length, message); + } + + static private void checkData(byte[] actual, int from, byte[] expected, + int len, String message) { + for (int idx = 0; idx < len; idx++) { + if (expected[from + idx] != actual[idx]) { + Assert.fail(message + " byte " + (from + idx) + " differs. expected " + + expected[from + idx] + " actual " + actual[idx]); + } + } + } + + static void checkFileContent(FileSystem fs, Path name, byte[] expected, + int readOffset) throws IOException { + FSDataInputStream stm = fs.open(name); + byte[] actual = new byte[expected.length-readOffset]; + stm.readFully(readOffset, actual); + checkData(actual, readOffset, expected, "Read 2"); + stm.close(); + // Now read using a different API. + actual = new byte[expected.length-readOffset]; + stm = fs.open(name); + long skipped = stm.skip(readOffset); + Assert.assertEquals(skipped, readOffset); + //Read a small number of bytes first. + int nread = stm.read(actual, 0, 3); + nread += stm.read(actual, nread, 2); + //Read across chunk boundary + nread += stm.read(actual, nread, 517); + checkData(actual, readOffset, expected, nread, "A few bytes"); + //Now read rest of it + while (nread < actual.length) { + int nbytes = stm.read(actual, nread, actual.length - nread); + if (nbytes < 0) { + throw new EOFException("End of file reached before reading fully."); + } + nread += nbytes; + } + checkData(actual, readOffset, expected, "Read 3"); + stm.close(); + } + + /** + * Test that file data can be read by reading the block file + * directly from the local store. + */ + public void doTestShortCircuitRead(boolean ignoreChecksum, int size, + int readOffset) throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + ignoreChecksum); + conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, + UserGroupInformation.getCurrentUser().getShortUserName()); + if (simulatedStorage) { + conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); + } + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(8). + format(true). + build(); + FileSystem fs = cluster.getFileSystem(); + try { + // check that / exists + Path path = new Path("/"); + assertTrue("/ should be a directory", + fs.getFileStatus(path).isDirectory() == true); + + byte[] fileData = AppendTestUtil.randomBytes(SEED, size); + // create a new file in home directory. Do not close it. + Path file1 = new Path("filelocal.dat"); + FSDataOutputStream stm = createFile(fs, file1, 1); + + // write to file + stm.write(fileData); + stm.close(); + checkFileContent(fs, file1, fileData, readOffset); + } finally { + fs.close(); + cluster.shutdown(); + } + } + + @Test + public void testFileLocalReadNoChecksum() throws IOException { + doTestShortCircuitRead(true, 3*BLOCKSIZE+100, 0); + } + + @Test + public void testFileLocalReadChecksum() throws IOException { + doTestShortCircuitRead(false, 3*BLOCKSIZE+100, 0); + } + + @Test + public void testSmallFileLocalRead() throws IOException { + doTestShortCircuitRead(false, 13, 0); + doTestShortCircuitRead(false, 13, 5); + doTestShortCircuitRead(true, 13, 0); + doTestShortCircuitRead(true, 13, 5); + } + + @Test + public void testReadFromAnOffset() throws IOException { + doTestShortCircuitRead(false, 3*BLOCKSIZE+100, 777); + doTestShortCircuitRead(true, 3*BLOCKSIZE+100, 777); + } + + @Test + public void testLongFile() throws IOException { + doTestShortCircuitRead(false, 10*BLOCKSIZE+100, 777); + doTestShortCircuitRead(true, 10*BLOCKSIZE+100, 777); + } + + @Test + public void testGetBlockLocalPathInfo() throws IOException, InterruptedException { + final Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser"); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); + cluster.waitActive(); + final DataNode dn = cluster.getDataNodes().get(0); + FileSystem fs = cluster.getFileSystem(); + try { + DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23); + UserGroupInformation aUgi = UserGroupInformation + .createRemoteUser("alloweduser"); + LocatedBlocks lb = cluster.getNameNode().getBlockLocations("/tmp/x", 0, + 16); + // Create a new block object, because the block inside LocatedBlock at + // namenode is of type BlockInfo. + Block blk = new Block(lb.get(0).getBlock()); + Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken(); + final DatanodeInfo dnInfo = lb.get(0).getLocations()[0]; + ClientDatanodeProtocol proxy = aUgi + .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { + @Override + public ClientDatanodeProtocol run() throws Exception { + return DFSClient.createClientDatanodeProtocolProxy( + dnInfo, conf, 60000); + } + }); + + //This should succeed + BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token); + Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(), + blpi.getBlockPath()); + RPC.stopProxy(proxy); + + // Now try with a not allowed user. + UserGroupInformation bUgi = UserGroupInformation + .createRemoteUser("notalloweduser"); + proxy = bUgi + .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { + @Override + public ClientDatanodeProtocol run() throws Exception { + return DFSClient.createClientDatanodeProtocolProxy( + dnInfo, conf, 60000); + } + }); + try { + proxy.getBlockLocalPathInfo(blk, token); + Assert.fail("The call should have failed as " + bUgi.getShortUserName() + + " is not allowed to call getBlockLocalPathInfo"); + } catch (IOException ex) { + Assert.assertTrue(ex.getMessage().contains( + "not allowed to call getBlockLocalPathInfo")); + } finally { + RPC.stopProxy(proxy); + } + } finally { + fs.close(); + cluster.shutdown(); + } + } + + /** + * Test to run benchmarks between shortcircuit read vs regular read with + * specified number of threads simultaneously reading. + * <br> + * Run this using the following command: + * bin/hadoop --config confdir \ + * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \ + * <shortcircuit on?> <checsum on?> <Number of threads> + */ + public static void main(String[] args) throws Exception { + ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.INFO); + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.INFO); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.INFO); + + if (args.length != 3) { + System.out.println("Usage: test shortcircuit checksum threadCount"); + System.exit(1); + } + boolean shortcircuit = Boolean.valueOf(args[0]); + boolean checksum = Boolean.valueOf(args[1]); + int threadCount = Integer.valueOf(args[2]); + + // Setup create a file + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + checksum); + + //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test + int fileSize = 1000 * BLOCKSIZE + 100; // File with 1000 blocks + final byte [] dataToWrite = AppendTestUtil.randomBytes(SEED, fileSize); + + // create a new file in home directory. Do not close it. + final Path file1 = new Path("filelocal.dat"); + final FileSystem fs = FileSystem.get(conf); + FSDataOutputStream stm = createFile(fs, file1, 1); + + stm.write(dataToWrite); + stm.close(); + + long start = System.currentTimeMillis(); + final int iteration = 20; + Thread[] threads = new Thread[threadCount]; + for (int i = 0; i < threadCount; i++) { + threads[i] = new Thread() { + public void run() { + for (int i = 0; i < iteration; i++) { + try { + checkFileContent(fs, file1, dataToWrite, 0); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + }; + } + for (int i = 0; i < threadCount; i++) { + threads[i].start(); + } + for (int i = 0; i < threadCount; i++) { + threads[i].join(); + } + long end = System.currentTimeMillis(); + System.out.println("Iteration " + iteration + " took " + (end - start)); + fs.delete(file1, false); + } +} Propchange: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Mon Apr 23 21:37:55 2012 @@ -34,11 +34,12 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; -import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.metrics.util.MBeanUtil; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -836,4 +837,9 @@ public class SimulatedFSDataset impleme public long getReplicaVisibleLength(Block block) throws IOException { return block.getNumBytes(); } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(Block blk) throws IOException { + throw new IOException("getBlockLocalPathInfo not supported."); + } } Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Apr 23 21:37:55 2012 @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -30,22 +33,21 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.RemoteBlockReader; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.net.NetUtils; - import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; /** * Fine-grain testing of block files and locations after volume failure. @@ -263,9 +265,9 @@ public class TestDataNodeVolumeFailure { s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); - String file = BlockReader.getFileName(targetAddr, block.getBlockId()); + String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId()); blockReader = - BlockReader.newBlockReader(s, file, block, lblock + RemoteBlockReader.newBlockReader(s, file, block, lblock .getBlockToken(), 0, -1, 4096); // nothing - if it fails - it will throw and exception Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java Mon Apr 23 21:37:55 2012 @@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.da import java.util.List; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.RemoteBlockReader; import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -58,7 +58,7 @@ public class TestDataXceiver { @Test public void testCompletePartialRead() throws Exception { // Ask for half the file - BlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2); + RemoteBlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2); DataNode dn = util.getDataNode(testBlock); DataBlockScanner scanner = spy(dn.blockScanner); dn.blockScanner = scanner; Modified: hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=1329468&r1=1329467&r2=1329468&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (original) +++ hadoop/common/branches/branch-0.22/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Mon Apr 23 21:37:55 2012 @@ -24,29 +24,33 @@ import java.util.EnumSet; import java.util.List; import java.util.Random; +import junit.framework.TestCase; + import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.RemoteBlockReader; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.security.token.block.*; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.token.*; +import org.apache.hadoop.security.token.Token; import org.apache.log4j.Level; -import junit.framework.TestCase; - public class TestBlockTokenWithDFS extends TestCase { private static final int BLOCK_SIZE = 1024; @@ -130,8 +134,8 @@ public class TestBlockTokenWithDFS exten s.connect(targetAddr, HdfsConstants.READ_TIMEOUT); s.setSoTimeout(HdfsConstants.READ_TIMEOUT); - String file = BlockReader.getFileName(targetAddr, block.getBlockId()); - blockReader = BlockReader.newBlockReader(s, file, block, + String file = RemoteBlockReader.getFileName(targetAddr, block.getBlockId()); + blockReader = RemoteBlockReader.newBlockReader(s, file, block, lblock.getBlockToken(), 0, -1, conf.getInt("io.file.buffer.size", 4096));
