HDFS-11575. Supporting HDFS NFS gateway with Federated HDFS. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6602b5f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6602b5f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6602b5f Branch: refs/heads/YARN-5881 Commit: d6602b5f39833611b4afa4581552f6c4c37e23a8 Parents: ec8bf9e Author: Jitendra Pandey <[email protected]> Authored: Tue Oct 10 09:49:46 2017 -0700 Committer: Jitendra Pandey <[email protected]> Committed: Tue Oct 10 10:38:05 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/nfs/nfs3/FileHandle.java | 51 ++- .../hadoop/nfs/nfs3/request/WRITE3Request.java | 4 +- .../hadoop/hdfs/nfs/mount/RpcProgramMountd.java | 81 +++-- .../hadoop/hdfs/nfs/nfs3/DFSClientCache.java | 174 ++++++--- .../apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java | 46 +++ .../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 19 +- .../hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java | 6 +- .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 362 +++++++++++-------- .../apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java | 8 +- .../hadoop/hdfs/nfs/nfs3/WriteManager.java | 24 +- .../nfs/nfs3/TestClientAccessPrivilege.java | 3 +- .../hdfs/nfs/nfs3/TestDFSClientCache.java | 13 +- .../hadoop/hdfs/nfs/nfs3/TestExportsTable.java | 161 ++++++++- .../hadoop/hdfs/nfs/nfs3/TestReaddir.java | 19 +- .../hdfs/nfs/nfs3/TestRpcProgramNfs3.java | 66 ++-- .../hdfs/nfs/nfs3/TestViewfsWithNfs3.java | 330 +++++++++++++++++ .../apache/hadoop/hdfs/nfs/nfs3/TestWrites.java | 9 +- 17 files changed, 1071 insertions(+), 305 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java index 5b32798..910b8f2 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java @@ -38,17 +38,21 @@ public class FileHandle { private static final int HANDLE_LEN = 32; private byte[] handle; // Opaque handle private long fileId = -1; + private int namenodeId = -1; public FileHandle() { handle = null; } /** - * Handle is a 32 bytes number. For HDFS, the last 8 bytes is fileId. + * Handle is a 32 bytes number. For HDFS, the last 8 bytes is fileId + * For ViewFs, last 8 byte is fileId while 4 bytes before that is namenodeId * @param v file id + * @param n namenode id */ - public FileHandle(long v) { + public FileHandle(long v, int n) { fileId = v; + namenodeId = n; handle = new byte[HANDLE_LEN]; handle[0] = (byte)(v >>> 56); handle[1] = (byte)(v >>> 48); @@ -58,11 +62,20 @@ public class FileHandle { handle[5] = (byte)(v >>> 16); handle[6] = (byte)(v >>> 8); handle[7] = (byte)(v >>> 0); - for (int i = 8; i < HANDLE_LEN; i++) { + + handle[8] = (byte) (n >>> 24); + handle[9] = (byte) (n >>> 16); + handle[10] = (byte) (n >>> 8); + handle[11] = (byte) (n >>> 0); + for (int i = 12; i < HANDLE_LEN; i++) { handle[i] = (byte) 0; } } - + + public FileHandle(long v) { + this(v, 0); + } + public FileHandle(String s) { MessageDigest digest; try { @@ -93,22 +106,32 @@ public class FileHandle { return true; } - private long bytesToLong(byte[] data) { + private long bytesToLong(byte[] data, int offset) { ByteBuffer buffer = ByteBuffer.allocate(8); for (int i = 0; i < 8; i++) { - buffer.put(data[i]); + buffer.put(data[i + offset]); } - buffer.flip();// need flip + buffer.flip(); // need flip return buffer.getLong(); } - + + private int bytesToInt(byte[] data, int offset) { + ByteBuffer buffer = ByteBuffer.allocate(4); + for (int i = 0; i < 4; i++) { + buffer.put(data[i + offset]); + } + buffer.flip(); // need flip + return buffer.getInt(); + } + public boolean deserialize(XDR xdr) { if (!XDR.verifyLength(xdr, 32)) { return false; } int size = xdr.readInt(); handle = xdr.readFixedOpaque(size); - fileId = bytesToLong(handle); + fileId = bytesToLong(handle, 0); + namenodeId = bytesToInt(handle, 8); return true; } @@ -122,11 +145,15 @@ public class FileHandle { public long getFileId() { return fileId; } + + public int getNamenodeId() { + return namenodeId; + } public byte[] getContent() { return handle.clone(); } - + @Override public String toString() { StringBuilder s = new StringBuilder(); @@ -154,4 +181,8 @@ public class FileHandle { public int hashCode() { return Arrays.hashCode(handle); } + + public String dumpFileHandle() { + return "fileId: " + fileId + " namenodeId: " + namenodeId; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java index d85dcbb..86a40c7 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java @@ -87,7 +87,7 @@ public class WRITE3Request extends RequestWithHandle { @Override public String toString() { - return String.format("fileId: %d offset: %d count: %d stableHow: %s", - handle.getFileId(), offset, count, stableHow.name()); + return String.format("fileHandle: %s offset: %d count: %d stableHow: %s", + handle.dumpFileHandle(), offset, count, stableHow.name()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java index e31bc71..4ae51c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java @@ -19,17 +19,20 @@ package org.apache.hadoop.hdfs.nfs.mount; import java.io.IOException; import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.URI; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; +import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.mount.MountEntry; import org.apache.hadoop.mount.MountInterface; @@ -64,14 +67,12 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { public static final int VERSION_2 = 2; public static final int VERSION_3 = 3; - private final DFSClient dfsClient; - - /** Synchronized list */ + /** Synchronized list. */ private final List<MountEntry> mounts; - /** List that is unmodifiable */ - private final List<String> exports; - + /** List that is unmodifiable. */ + private final HashMap<String, URI> exports; + private final NfsConfiguration config; private final NfsExports hostsMatcher; public RpcProgramMountd(NfsConfiguration config, @@ -84,17 +85,29 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { VERSION_3, registrationSocket, allowInsecurePorts, config.getInt( NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY, NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT)); - exports = new ArrayList<String>(); - exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, - NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT)); + this.config = config; + exports = new HashMap<>(); + addExports(); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>()); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); - this.dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config), config); } - + + private void addExports() throws IOException { + FileSystem fs = FileSystem.get(config); + String[] exportsPath = + config.getStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, + NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT); + for (String exportPath : exportsPath) { + URI exportURI = Nfs3Utils.getResolvedURI(fs, exportPath); + LOG.info("FS:" + fs.getScheme() + " adding export Path:" + exportPath + + " with URI: " + exportURI.toString()); + exports.put(exportPath, exportURI); + } + } + @Override public XDR nullOp(XDR out, int xid, InetAddress client) { if (LOG.isDebugEnabled()) { @@ -125,17 +138,28 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { if (LOG.isDebugEnabled()) { LOG.debug("Got host: " + host + " path: " + path); } - if (!exports.contains(path)) { + URI exportURI = exports.get(path); + if (exportURI == null) { LOG.info("Path " + path + " is not shared."); MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null); return out; } + DFSClient dfsClient = null; + try { + dfsClient = new DFSClient(exportURI, config); + } catch (Exception e) { + LOG.error("Can't get handle for export:" + path, e); + MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null); + return out; + } + FileHandle handle = null; try { - HdfsFileStatus exFileStatus = dfsClient.getFileInfo(path); - - handle = new FileHandle(exFileStatus.getFileId()); + HdfsFileStatus exFileStatus = dfsClient.getFileInfo(exportURI.getPath()); + + handle = new FileHandle(exFileStatus.getFileId(), + Nfs3Utils.getNamenodeId(config, exportURI)); } catch (IOException e) { LOG.error("Can't get handle for export:" + path, e); MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null); @@ -143,7 +167,8 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { } assert (handle != null); - LOG.info("Giving handle (fileId:" + handle.getFileId() + LOG.info("Giving handle (fileHandle:" + handle.dumpFileHandle() + + " file URI: " + exportURI + ") to client for export " + path); mounts.add(new MountEntry(host, path)); @@ -195,7 +220,8 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { info.data().readBytes(data); XDR xdr = new XDR(data); XDR out = new XDR(); - InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress(); + InetAddress client = + ((InetSocketAddress) info.remoteAddress()).getAddress(); if (mntproc == MNTPROC.NULL) { out = nullOp(out, xid, client); @@ -214,16 +240,20 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { } else if (mntproc == MNTPROC.UMNTALL) { umntall(out, xid, client); } else if (mntproc == MNTPROC.EXPORT) { - // Currently only support one NFS export + // Currently only support one NFS export per namenode List<NfsExports> hostsMatchers = new ArrayList<NfsExports>(); if (hostsMatcher != null) { - hostsMatchers.add(hostsMatcher); - out = MountResponse.writeExportList(out, xid, exports, hostsMatchers); + List exportsList = getExports(); + for (int i = 0; i < exportsList.size(); i++) { + hostsMatchers.add(hostsMatcher); + } + out = MountResponse.writeExportList(out, xid, + exportsList, hostsMatchers); } else { // This means there are no valid exports provided. RpcAcceptedReply.getInstance(xid, - RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( - out); + RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()) + .write(out); } } else { // Invalid procedure @@ -231,7 +261,8 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( out); } - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + ChannelBuffer buf = + ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); } @@ -244,6 +275,6 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { @VisibleForTesting public List<String> getExports() { - return this.exports; + return new ArrayList<>(this.exports.keySet()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java index b946bce..9a9366f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java @@ -20,8 +20,11 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import org.apache.commons.logging.LogFactory; import java.io.IOException; +import java.net.URI; +import java.nio.file.FileSystemException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; @@ -31,9 +34,10 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSInputStream; -import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.security.UserGroupInformation; @@ -48,63 +52,97 @@ import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; /** - * A cache saves DFSClient objects for different users + * A cache saves DFSClient objects for different users. */ class DFSClientCache { private static final Log LOG = LogFactory.getLog(DFSClientCache.class); /** * Cache that maps User id to the corresponding DFSClient. */ - @VisibleForTesting - final LoadingCache<String, DFSClient> clientCache; + private final LoadingCache<DfsClientKey, DFSClient> clientCache; final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256; /** - * Cache that maps <DFSClient, inode path> to the corresponding + * Cache that maps <DFSClient, inode path, nnid> to the corresponding * FSDataInputStream. */ - final LoadingCache<DFSInputStreamCaheKey, FSDataInputStream> inputstreamCache; + private final LoadingCache<DFSInputStreamCacheKey, + FSDataInputStream> inputstreamCache; /** - * Time to live for a DFSClient (in seconds) + * Time to live for a DFSClient (in seconds). */ final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024; final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60; private final NfsConfiguration config; + private final HashMap<Integer, URI> namenodeUriMap; - private static class DFSInputStreamCaheKey { - final String userId; - final String inodePath; + private static final class DFSInputStreamCacheKey { + private final String userId; + private final String inodePath; + private final int namenodeId; - private DFSInputStreamCaheKey(String userId, String inodePath) { + private DFSInputStreamCacheKey(String userId, String inodePath, + int namenodeId) { super(); this.userId = userId; this.inodePath = inodePath; + this.namenodeId = namenodeId; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DFSInputStreamCacheKey) { + DFSInputStreamCacheKey k = (DFSInputStreamCacheKey) obj; + return userId.equals(k.userId) && + inodePath.equals(k.inodePath) && + (namenodeId == k.namenodeId); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(userId, inodePath, namenodeId); + } + } + + private static final class DfsClientKey { + private final String userName; + private final int namenodeId; + + private DfsClientKey(String userName, int namenodeId) { + this.userName = userName; + this.namenodeId = namenodeId; } @Override public boolean equals(Object obj) { - if (obj instanceof DFSInputStreamCaheKey) { - DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj; - return userId.equals(k.userId) && inodePath.equals(k.inodePath); + if (obj instanceof DfsClientKey) { + DfsClientKey k = (DfsClientKey) obj; + return userName.equals(k.userName) && + (namenodeId == k.namenodeId); } return false; } @Override public int hashCode() { - return Objects.hashCode(userId, inodePath); + return Objects.hashCode(userName, namenodeId); } } - DFSClientCache(NfsConfiguration config) { + DFSClientCache(NfsConfiguration config) throws IOException { this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE); } - - DFSClientCache(NfsConfiguration config, int clientCache) { + + DFSClientCache(NfsConfiguration config, int clientCache) throws IOException { this.config = config; + namenodeUriMap = new HashMap<>(); + prepareAddressMap(); + this.clientCache = CacheBuilder.newBuilder() .maximumSize(clientCache) .removalListener(clientRemovalListener()) @@ -115,11 +153,36 @@ class DFSClientCache { .expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS) .removalListener(inputStreamRemovalListener()) .build(inputStreamLoader()); - + ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(), SHUTDOWN_HOOK_PRIORITY); } + private void prepareAddressMap() throws IOException { + FileSystem fs = FileSystem.get(config); + String[] exportsPath = + config.getStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, + NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT); + for (String exportPath : exportsPath) { + URI exportURI = Nfs3Utils.getResolvedURI(fs, exportPath); + int namenodeId = Nfs3Utils.getNamenodeId(config, exportURI); + URI value = namenodeUriMap.get(namenodeId); + // if a unique nnid, add it to the map + if (value == null) { + LOG.info("Added export:" + exportPath + " FileSystem URI:" + exportURI + + " with namenodeId:" + namenodeId); + namenodeUriMap.put(namenodeId, exportURI); + } else { + // if the nnid already exists, it better be the for the same namenode + String msg = String.format("FS:%s, Namenode ID collision for path:%s " + + "nnid:%s uri being added:%s existing uri:%s", fs.getScheme(), + exportPath, namenodeId, exportURI, value); + LOG.error(msg); + throw new FileSystemException(msg); + } + } + } + /** * Priority of the FileSystem shutdown hook. */ @@ -135,7 +198,12 @@ class DFSClientCache { } } } - + + @VisibleForTesting + public LoadingCache<DfsClientKey, DFSClient> getClientCache() { + return clientCache; + } + /** * Close all DFSClient instances in the Cache. * @param onlyAutomatic only close those that are marked for automatic closing @@ -143,9 +211,9 @@ class DFSClientCache { synchronized void closeAll(boolean onlyAutomatic) throws IOException { List<IOException> exceptions = new ArrayList<IOException>(); - ConcurrentMap<String, DFSClient> map = clientCache.asMap(); + ConcurrentMap<DfsClientKey, DFSClient> map = clientCache.asMap(); - for (Entry<String, DFSClient> item : map.entrySet()) { + for (Entry<DfsClientKey, DFSClient> item : map.entrySet()) { final DFSClient client = item.getValue(); if (client != null) { try { @@ -160,20 +228,24 @@ class DFSClientCache { throw MultipleIOException.createIOException(exceptions); } } - - private CacheLoader<String, DFSClient> clientLoader() { - return new CacheLoader<String, DFSClient>() { + + private CacheLoader<DfsClientKey, DFSClient> clientLoader() { + return new CacheLoader<DfsClientKey, DFSClient>() { @Override - public DFSClient load(String userName) throws Exception { + public DFSClient load(final DfsClientKey key) throws Exception { UserGroupInformation ugi = getUserGroupInformation( - userName, - UserGroupInformation.getCurrentUser()); + key.userName, UserGroupInformation.getCurrentUser()); // Guava requires CacheLoader never returns null. return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() { @Override public DFSClient run() throws IOException { - return new DFSClient(DFSUtilClient.getNNAddress(config), config); + URI namenodeURI = namenodeUriMap.get(key.namenodeId); + if (namenodeURI == null) { + throw new IOException("No namenode URI found for user:" + + key.userName + " namenodeId:" + key.namenodeId); + } + return new DFSClient(namenodeURI, config); } }); } @@ -181,7 +253,7 @@ class DFSClientCache { } /** - * This method uses the currentUser, and real user to create a proxy + * This method uses the currentUser, and real user to create a proxy. * @param effectiveUser The user who is being proxied by the real user * @param realUser The actual user who does the command * @return Proxy UserGroupInformation @@ -204,10 +276,11 @@ class DFSClientCache { return ugi; } - private RemovalListener<String, DFSClient> clientRemovalListener() { - return new RemovalListener<String, DFSClient>() { + private RemovalListener<DfsClientKey, DFSClient> clientRemovalListener() { + return new RemovalListener<DfsClientKey, DFSClient>() { @Override - public void onRemoval(RemovalNotification<String, DFSClient> notification) { + public void onRemoval( + RemovalNotification<DfsClientKey, DFSClient> notification) { DFSClient client = notification.getValue(); try { client.close(); @@ -220,12 +293,15 @@ class DFSClientCache { }; } - private RemovalListener<DFSInputStreamCaheKey, FSDataInputStream> inputStreamRemovalListener() { - return new RemovalListener<DFSClientCache.DFSInputStreamCaheKey, FSDataInputStream>() { + private RemovalListener + <DFSInputStreamCacheKey, FSDataInputStream> inputStreamRemovalListener() { + return new RemovalListener + <DFSClientCache.DFSInputStreamCacheKey, FSDataInputStream>() { @Override public void onRemoval( - RemovalNotification<DFSInputStreamCaheKey, FSDataInputStream> notification) { + RemovalNotification<DFSInputStreamCacheKey, FSDataInputStream> + notification) { try { notification.getValue().close(); } catch (IOException ignored) { @@ -234,22 +310,24 @@ class DFSClientCache { }; } - private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() { - return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() { + private CacheLoader<DFSInputStreamCacheKey, FSDataInputStream> + inputStreamLoader() { + return new CacheLoader<DFSInputStreamCacheKey, FSDataInputStream>() { @Override - public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { - DFSClient client = getDfsClient(key.userId); + public FSDataInputStream + load(DFSInputStreamCacheKey key) throws Exception { + DFSClient client = getDfsClient(key.userId, key.namenodeId); DFSInputStream dis = client.open(key.inodePath); return client.createWrappedInputStream(dis); } }; } - DFSClient getDfsClient(String userName) { + DFSClient getDfsClient(String userName, int namenodeId) { DFSClient client = null; try { - client = clientCache.get(userName); + client = clientCache.get(new DfsClientKey(userName, namenodeId)); } catch (ExecutionException e) { LOG.error("Failed to create DFSClient for user:" + userName + " Cause:" + e); @@ -257,8 +335,10 @@ class DFSClientCache { return client; } - FSDataInputStream getDfsInputStream(String userName, String inodePath) { - DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + FSDataInputStream getDfsInputStream(String userName, String inodePath, + int namenodeId) { + DFSInputStreamCacheKey k = + new DFSInputStreamCacheKey(userName, inodePath, namenodeId); FSDataInputStream s = null; try { s = inputstreamCache.get(k); @@ -269,8 +349,10 @@ class DFSClientCache { return s; } - public void invalidateDfsInputStream(String userName, String inodePath) { - DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + public void invalidateDfsInputStream(String userName, String inodePath, + int namenodeId) { + DFSInputStreamCacheKey k = + new DFSInputStreamCacheKey(userName, inodePath, namenodeId); inputstreamCache.invalidate(k); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java index e376ebd..c6da198 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java @@ -18,8 +18,17 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.file.FileSystemException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.viewfs.ViewFileSystem; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.NfsTime; @@ -224,4 +233,41 @@ public class Nfs3Utils { public static long getElapsedTime(long startTimeNano) { return System.nanoTime() - startTimeNano; } + + public static int getNamenodeId(Configuration conf) { + URI filesystemURI = FileSystem.getDefaultUri(conf); + return getNamenodeId(conf, filesystemURI); + } + + public static int getNamenodeId(Configuration conf, URI namenodeURI) { + InetSocketAddress address = + DFSUtilClient.getNNAddressCheckLogical(conf, namenodeURI); + return address.hashCode(); + } + + public static URI getResolvedURI(FileSystem fs, String exportPath) + throws IOException { + URI fsURI = fs.getUri(); + String scheme = fs.getScheme(); + if (scheme.equalsIgnoreCase(FsConstants.VIEWFS_SCHEME)) { + ViewFileSystem viewFs = (ViewFileSystem)fs; + ViewFileSystem.MountPoint[] mountPoints = viewFs.getMountPoints(); + for (ViewFileSystem.MountPoint mount : mountPoints) { + String mountedPath = mount.getMountedOnPath().toString(); + if (exportPath.startsWith(mountedPath)) { + String subpath = exportPath.substring(mountedPath.length()); + fsURI = mount.getTargetFileSystemURIs()[0].resolve(subpath); + break; + } + } + } else if (scheme.equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) { + fsURI = fsURI.resolve(exportPath); + } + + if (!fsURI.getScheme().equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) { + throw new FileSystemException("Only HDFS is supported as underlying" + + "FileSystem, fs scheme:" + scheme + " uri to be added" + fsURI); + } + return fsURI; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 2617019..5b7dc14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -442,7 +442,7 @@ class OpenFileCtx { if (!activeState) { LOG.info("OpenFileCtx is inactive, fileId: " - + request.getHandle().getFileId()); + + request.getHandle().dumpFileHandle()); WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); @@ -981,7 +981,8 @@ class OpenFileCtx { * Check stream status to decide if it should be closed * @return true, remove stream; false, keep stream */ - public synchronized boolean streamCleanup(long fileId, long streamTimeout) { + public synchronized boolean streamCleanup(FileHandle handle, + long streamTimeout) { Preconditions .checkState(streamTimeout >= NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); if (!activeState) { @@ -992,7 +993,8 @@ class OpenFileCtx { // Check the stream timeout if (checkStreamTimeout(streamTimeout)) { if (LOG.isDebugEnabled()) { - LOG.debug("stream can be closed for fileId: " + fileId); + LOG.debug("stream can be closed for fileId: " + + handle.dumpFileHandle()); } flag = true; } @@ -1188,7 +1190,7 @@ class OpenFileCtx { FileHandle handle = writeCtx.getHandle(); if (LOG.isDebugEnabled()) { - LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " + LOG.debug("do write, fileHandle " + handle.dumpFileHandle() + " offset: " + offset + " length: " + count + " stableHow: " + stableHow.name()); } @@ -1213,8 +1215,9 @@ class OpenFileCtx { writeCtx.setDataState(WriteCtx.DataState.NO_DUMP); updateNonSequentialWriteInMemory(-count); if (LOG.isDebugEnabled()) { - LOG.debug("After writing " + handle.getFileId() + " at offset " - + offset + ", updated the memory count, new value: " + LOG.debug("After writing " + handle.dumpFileHandle() + + " at offset " + offset + + ", updated the memory count, new value: " + nonSequentialWriteInMemory.get()); } } @@ -1257,8 +1260,8 @@ class OpenFileCtx { processCommits(writeCtx.getOffset() + writeCtx.getCount()); } catch (IOException e) { - LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " - + offset + " and length " + count, e); + LOG.error("Error writing to fileHandle " + handle.dumpFileHandle() + + " at offset " + offset + " and length " + count, e); if (!writeCtx.getReplied()) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); Nfs3Utils.writeChannel(channel, response.serialize( http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java index e26fac5..e23e490 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java @@ -156,7 +156,7 @@ class OpenFileCtxCache { Entry<FileHandle, OpenFileCtx> pairs = it.next(); FileHandle handle = pairs.getKey(); OpenFileCtx ctx = pairs.getValue(); - if (!ctx.streamCleanup(handle.getFileId(), streamTimeout)) { + if (!ctx.streamCleanup(handle, streamTimeout)) { continue; } @@ -164,10 +164,10 @@ class OpenFileCtxCache { synchronized (this) { OpenFileCtx ctx2 = openFileMap.get(handle); if (ctx2 != null) { - if (ctx2.streamCleanup(handle.getFileId(), streamTimeout)) { + if (ctx2.streamCleanup(handle, streamTimeout)) { openFileMap.remove(handle); if (LOG.isDebugEnabled()) { - LOG.debug("After remove stream " + handle.getFileId() + LOG.debug("After remove stream " + handle.dumpFileHandle() + ", the stream number:" + size()); } ctxToRemove.add(ctx2); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 7a6aa89..0db633f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -319,12 +319,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - GETATTR3Request request; try { request = GETATTR3Request.deserialize(xdr); @@ -335,9 +329,17 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("GETATTR for fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("GETATTR for fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } Nfs3FileAttributes attrs = null; @@ -412,11 +414,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } SETATTR3Request request; try { @@ -428,9 +425,17 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS SETATTR fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS SETATTR fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } if (request.getAttr().getUpdateFields().contains(SetAttrField.SIZE)) { @@ -498,12 +503,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - LOOKUP3Request request; try { request = LOOKUP3Request.deserialize(xdr); @@ -514,15 +513,22 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { FileHandle dirHandle = request.getHandle(); String fileName = request.getName(); + int namenodeId = dirHandle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS LOOKUP dir fileId: " + dirHandle.getFileId() + " name: " - + fileName + " client: " + remoteAddress); + LOG.debug("NFS LOOKUP dir fileHandle: " + dirHandle.dumpFileHandle() + + " name: " + fileName + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } try { String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle); Nfs3FileAttributes postOpObjAttr = writeManager.getFileAttr(dfsClient, - dirHandle, fileName); + dirHandle, fileName, namenodeId); if (postOpObjAttr == null) { if (LOG.isDebugEnabled()) { LOG.debug("NFS LOOKUP fileId: " + dirHandle.getFileId() + " name: " @@ -540,7 +546,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { LOG.info("Can't get path for dir fileId: " + dirHandle.getFileId()); return new LOOKUP3Response(Nfs3Status.NFS3ERR_STALE); } - FileHandle fileHandle = new FileHandle(postOpObjAttr.getFileId()); + FileHandle fileHandle = + new FileHandle(postOpObjAttr.getFileId(), namenodeId); return new LOOKUP3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr, postOpDirAttr); @@ -566,12 +573,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - ACCESS3Request request; try { request = ACCESS3Request.deserialize(xdr); @@ -581,13 +582,21 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); - Nfs3FileAttributes attrs; + int namenodeId = handle.getNamenodeId(); + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } if (LOG.isDebugEnabled()) { - LOG.debug("NFS ACCESS fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS ACCESS fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); } + Nfs3FileAttributes attrs; try { attrs = writeManager.getFileAttr(dfsClient, handle, iug); @@ -639,12 +648,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - READLINK3Request request; try { @@ -655,9 +658,17 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS READLINK fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS READLINK fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } String fileIdPath = Nfs3Utils.getFileIdPath(handle); @@ -715,12 +726,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(userName); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - READ3Request request; try { @@ -734,9 +739,16 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { int count = request.getCount(); FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset - + " count: " + count + " client: " + remoteAddress); + LOG.debug("NFS READ fileHandle: " + handle.dumpFileHandle()+ " offset: " + + offset + " count: " + count + " client: " + remoteAddress); + } + + DFSClient dfsClient = clientCache.getDfsClient(userName, namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } Nfs3FileAttributes attrs; @@ -791,7 +803,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { */ for (int i = 0; i < 1; ++i) { FSDataInputStream fis = clientCache.getDfsInputStream(userName, - Nfs3Utils.getFileIdPath(handle)); + Nfs3Utils.getFileIdPath(handle), namenodeId); if (fis == null) { return new READ3Response(Nfs3Status.NFS3ERR_ACCES); @@ -805,7 +817,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { // which requires incompatible changes. if (e.getMessage().equals("Stream closed")) { clientCache.invalidateDfsInputStream(userName, - Nfs3Utils.getFileIdPath(handle)); + Nfs3Utils.getFileIdPath(handle), namenodeId); continue; } else { throw e; @@ -850,11 +862,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { SecurityHandler securityHandler, SocketAddress remoteAddress) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } WRITE3Request request; @@ -875,12 +882,20 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS WRITE fileId: " + handle.getFileId() + " offset: " + LOG.debug("NFS WRITE fileHandle: " + handle.dumpFileHandle() + " offset: " + offset + " length: " + count + " stableHow: " + stableHow.getValue() + " xid: " + xid + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + Nfs3FileAttributes preOpAttr = null; try { preOpAttr = writeManager.getFileAttr(dfsClient, handle, iug); @@ -932,11 +947,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { CREATE3Response create(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } CREATE3Request request; @@ -949,11 +959,19 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { FileHandle dirHandle = request.getHandle(); String fileName = request.getName(); + int namenodeId = dirHandle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS CREATE dir fileId: " + dirHandle.getFileId() + LOG.debug("NFS CREATE dir fileHandle: " + dirHandle.dumpFileHandle() + " filename: " + fileName + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + int createMode = request.getMode(); if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE) && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE) @@ -1016,7 +1034,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug, aixCompatMode, config); - fileHandle = new FileHandle(postOpObjAttr.getFileId()); + fileHandle = new FileHandle(postOpObjAttr.getFileId(), namenodeId); if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) { LOG.warn("Can't add more stream, close it." + " Future write will become append"); @@ -1066,11 +1084,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } MKDIR3Request request; @@ -1082,9 +1095,18 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle dirHandle = request.getHandle(); String fileName = request.getName(); + int namenodeId = dirHandle.getNamenodeId(); + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + if (LOG.isDebugEnabled()) { - LOG.debug("NFS MKDIR dirId: " + dirHandle.getFileId() + " filename: " - + fileName + " client: " + remoteAddress); + LOG.debug("NFS MKDIR dirHandle: " + dirHandle.dumpFileHandle() + + " filename: " + fileName + " client: " + remoteAddress); } if (request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) { @@ -1130,11 +1152,11 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { setattrInternal(dfsClient, fileIdPath, setAttr3, false); postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); - objFileHandle = new FileHandle(postOpObjAttr.getFileId()); + objFileHandle = new FileHandle(postOpObjAttr.getFileId(), namenodeId); WccData dirWcc = Nfs3Utils.createWccData( Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug); return new MKDIR3Response(Nfs3Status.NFS3_OK, new FileHandle( - postOpObjAttr.getFileId()), postOpObjAttr, dirWcc); + postOpObjAttr.getFileId(), namenodeId), postOpObjAttr, dirWcc); } catch (IOException e) { LOG.warn("Exception ", e); // Try to return correct WccData @@ -1167,11 +1189,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } REMOVE3Request request; try { @@ -1181,12 +1198,21 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return new REMOVE3Response(Nfs3Status.NFS3ERR_INVAL); } FileHandle dirHandle = request.getHandle(); + int namenodeId = dirHandle.getNamenodeId(); + String fileName = request.getName(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS REMOVE dir fileId: " + dirHandle.getFileId() + LOG.debug("NFS REMOVE dir fileHandle: " + dirHandle.dumpFileHandle() + " fileName: " + fileName + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle); Nfs3FileAttributes preOpDirAttr = null; Nfs3FileAttributes postOpDirAttr = null; @@ -1247,11 +1273,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } RMDIR3Request request; try { @@ -1262,12 +1283,19 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle dirHandle = request.getHandle(); String fileName = request.getName(); - + int namenodeId = dirHandle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS RMDIR dir fileId: " + dirHandle.getFileId() + LOG.debug("NFS RMDIR dir fileHandle: " + dirHandle.dumpFileHandle() + " fileName: " + fileName + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle); Nfs3FileAttributes preOpDirAttr = null; Nfs3FileAttributes postOpDirAttr = null; @@ -1332,11 +1360,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } RENAME3Request request = null; try { @@ -1347,13 +1370,28 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle fromHandle = request.getFromDirHandle(); + int fromNamenodeId = fromHandle.getNamenodeId(); String fromName = request.getFromName(); FileHandle toHandle = request.getToDirHandle(); + int toNamenodeId = toHandle.getNamenodeId(); String toName = request.getToName(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS RENAME from: " + fromHandle.getFileId() + "/" + fromName - + " to: " + toHandle.getFileId() + "/" + toName + " client: " - + remoteAddress); + LOG.debug("NFS RENAME from: " + fromHandle.dumpFileHandle() + + "/" + fromName + " to: " + toHandle.dumpFileHandle() + + "/" + toName + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), fromNamenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + + if (fromNamenodeId != toNamenodeId) { + // renaming file from one namenode to another is not supported + response.setStatus(Nfs3Status.NFS3ERR_INVAL); + return response; } String fromDirFileIdPath = Nfs3Utils.getFileIdPath(fromHandle); @@ -1429,12 +1467,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - SYMLINK3Request request; try { request = SYMLINK3Request.deserialize(xdr); @@ -1448,11 +1480,20 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { String name = request.getName(); String symData = request.getSymData(); String linkDirIdPath = Nfs3Utils.getFileIdPath(dirHandle); + int namenodeId = dirHandle.getNamenodeId(); + // Don't do any name check to source path, just leave it to HDFS String linkIdPath = linkDirIdPath + "/" + name; if (LOG.isDebugEnabled()) { LOG.debug("NFS SYMLINK, target: " + symData + " link: " + linkIdPath - + " client: " + remoteAddress); + + " namenodeId: " + namenodeId + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } try { @@ -1471,7 +1512,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { .setPostOpAttr(Nfs3Utils.getFileAttr(dfsClient, linkDirIdPath, iug)); return new SYMLINK3Response(Nfs3Status.NFS3_OK, new FileHandle( - objAttr.getFileId()), objAttr, dirWcc); + objAttr.getFileId(), namenodeId), objAttr, dirWcc); } catch (IOException e) { LOG.warn("Exception: " + e); @@ -1524,12 +1565,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - READDIR3Request request; try { request = READDIR3Request.deserialize(xdr); @@ -1538,6 +1573,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return new READDIR3Response(Nfs3Status.NFS3ERR_INVAL); } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); + long cookie = request.getCookie(); if (cookie < 0) { LOG.error("Invalid READDIR request, with negative cookie: " + cookie); @@ -1550,8 +1587,16 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } if (LOG.isDebugEnabled()) { - LOG.debug("NFS READDIR fileId: " + handle.getFileId() + " cookie: " - + cookie + " count: " + count + " client: " + remoteAddress); + LOG.debug("NFS READDIR fileHandle: " + handle.dumpFileHandle() + + " cookie: " + cookie + " count: " + count + " client: " + + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } HdfsFileStatus dirStatus; @@ -1684,10 +1729,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES); } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); - } READDIRPLUS3Request request = null; try { @@ -1698,6 +1739,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); long cookie = request.getCookie(); if (cookie < 0) { LOG.error("Invalid READDIRPLUS request, with negative cookie: " + cookie); @@ -1715,9 +1757,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } if (LOG.isDebugEnabled()) { - LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: " - + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount - + " client: " + remoteAddress); + LOG.debug("NFS READDIRPLUS fileHandle: " + handle.dumpFileHandle() + + " cookie: " + cookie + " dirCount: " + dirCount + " maxCount: " + + maxCount + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); } HdfsFileStatus dirStatus; @@ -1805,14 +1853,14 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { entries[0] = new READDIRPLUS3Response.EntryPlus3( postOpDirAttr.getFileId(), ".", 0, postOpDirAttr, new FileHandle( - postOpDirAttr.getFileId())); + postOpDirAttr.getFileId(), namenodeId)); entries[1] = new READDIRPLUS3Response.EntryPlus3(dotdotFileId, "..", dotdotFileId, Nfs3Utils.getNfs3FileAttrFromFileStatus(dotdotStatus, - iug), new FileHandle(dotdotFileId)); + iug), new FileHandle(dotdotFileId, namenodeId)); for (int i = 2; i < n + 2; i++) { long fileId = fstatus[i - 2].getFileId(); - FileHandle childHandle = new FileHandle(fileId); + FileHandle childHandle = new FileHandle(fileId, namenodeId); Nfs3FileAttributes attr; try { attr = writeManager.getFileAttr(dfsClient, childHandle, iug); @@ -1829,7 +1877,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { entries = new READDIRPLUS3Response.EntryPlus3[n]; for (int i = 0; i < n; i++) { long fileId = fstatus[i].getFileId(); - FileHandle childHandle = new FileHandle(fileId); + FileHandle childHandle = new FileHandle(fileId, namenodeId); Nfs3FileAttributes attr; try { attr = writeManager.getFileAttr(dfsClient, childHandle, iug); @@ -1863,11 +1911,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } FSSTAT3Request request; try { @@ -1878,9 +1921,17 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS FSSTAT fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS FSSTAT fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } try { @@ -1938,12 +1989,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - FSINFO3Request request; try { request = FSINFO3Request.deserialize(xdr); @@ -1953,9 +1998,17 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS FSINFO fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS FSINFO fileHandle: " + handle.dumpFileHandle() + +" client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } try { @@ -2003,12 +2056,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - PATHCONF3Request request; try { request = PATHCONF3Request.deserialize(xdr); @@ -2019,10 +2066,18 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { FileHandle handle = request.getHandle(); Nfs3FileAttributes attrs; + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS PATHCONF fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS PATHCONF fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } try { @@ -2055,11 +2110,6 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { COMMIT3Response commit(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, SocketAddress remoteAddress) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } COMMIT3Request request; try { @@ -2071,12 +2121,20 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS COMMIT fileId: " + handle.getFileId() + " offset=" + LOG.debug("NFS COMMIT fileHandle: " + handle.dumpFileHandle() + " offset=" + request.getOffset() + " count=" + request.getCount() + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + String fileIdPath = Nfs3Utils.getFileIdPath(handle); Nfs3FileAttributes preOpAttr = null; try { @@ -2097,7 +2155,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { // Insert commit as an async request writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid, - preOpAttr); + preOpAttr, namenodeId); return null; } catch (IOException e) { LOG.warn("Exception ", e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java index f89679f..5d66751 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java @@ -318,9 +318,9 @@ class WriteCtx { @Override public String toString() { - return "Id:" + handle.getFileId() + " offset:" + getPlainOffset() + " " + - "count:" + count + " originalCount:" + getOriginalCount() + - " stableHow:" + stableHow + " replied:" + replied + " dataState:" + - dataState + " xid:" + xid; + return "FileHandle:" + handle.dumpFileHandle() + " offset:" + + getPlainOffset() + " " + "count:" + count + " originalCount:" + + getOriginalCount() + " stableHow:" + stableHow + " replied:" + + replied + " dataState:" + dataState + " xid:" + xid; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 7810ce2..0a3450d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -139,7 +139,8 @@ public class WriteManager { FileHandle fileHandle = request.getHandle(); OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { - LOG.info("No opened stream for fileId: " + fileHandle.getFileId()); + LOG.info("No opened stream for fileHandle: " + + fileHandle.dumpFileHandle()); String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId()); HdfsDataOutputStream fos = null; @@ -188,7 +189,8 @@ public class WriteManager { try { fos.close(); } catch (IOException e) { - LOG.error("Can't close stream for fileId: " + handle.getFileId(), e); + LOG.error("Can't close stream for fileHandle: " + + handle.dumpFileHandle(), e); } // Notify client to retry WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); @@ -201,7 +203,8 @@ public class WriteManager { } if (LOG.isDebugEnabled()) { - LOG.debug("Opened stream for appending file: " + fileHandle.getFileId()); + LOG.debug("Opened stream for appending file: " + + fileHandle.dumpFileHandle()); } } @@ -220,7 +223,7 @@ public class WriteManager { if (openFileCtx == null) { if (LOG.isDebugEnabled()) { - LOG.debug("No opened stream for fileId: " + fileHandle.getFileId() + LOG.debug("No opened stream for fileId: " + fileHandle.dumpFileHandle() + " commitOffset=" + commitOffset + ". Return success in this case."); } @@ -263,13 +266,14 @@ public class WriteManager { } void handleCommit(DFSClient dfsClient, FileHandle fileHandle, - long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, + int namenodeId) { long startTime = System.nanoTime(); int status; OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { - LOG.info("No opened stream for fileId: " + fileHandle.getFileId() + LOG.info("No opened stream for fileId: " + fileHandle.dumpFileHandle() + " commitOffset=" + commitOffset + ". Return success in this case."); status = Nfs3Status.NFS3_OK; @@ -304,7 +308,9 @@ public class WriteManager { // Send out the response Nfs3FileAttributes postOpAttr = null; try { - postOpAttr = getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId()), iug); + postOpAttr = + getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId(), + namenodeId), iug); } catch (IOException e1) { LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileId(), e1); } @@ -334,13 +340,13 @@ public class WriteManager { } Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle, - String fileName) throws IOException { + String fileName, int namenodeId) throws IOException { String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName; Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) { OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr - .getFileId())); + .getFileId(), namenodeId)); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java index b68fdb8..007803d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java @@ -101,9 +101,10 @@ public class TestClientAccessPrivilege { // Create a remove request HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeString("f1"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java index 54b7ee7..ba9d46e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java @@ -43,15 +43,17 @@ public class TestDFSClientCache { DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); - DFSClient c1 = cache.getDfsClient("test1"); - assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1")); - assertEquals(c1, cache.getDfsClient("test1")); + int namenodeId = Nfs3Utils.getNamenodeId(conf); + DFSClient c1 = cache.getDfsClient("test1", namenodeId); + assertTrue(cache.getDfsClient("test1", namenodeId) + .toString().contains("ugi=test1")); + assertEquals(c1, cache.getDfsClient("test1", namenodeId)); assertFalse(isDfsClientClose(c1)); - cache.getDfsClient("test2"); + cache.getDfsClient("test2", namenodeId); assertTrue(isDfsClientClose(c1)); assertTrue("cache size should be the max size or less", - cache.clientCache.size() <= MAX_CACHE_SIZE); + cache.getClientCache().size() <= MAX_CACHE_SIZE); } @Test @@ -61,6 +63,7 @@ public class TestDFSClientCache { NfsConfiguration conf = new NfsConfiguration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost"); UserGroupInformation currentUserUgi = UserGroupInformation.createRemoteUser(currentUser); currentUserUgi.setAuthenticationMethod(KERBEROS); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java index c802be0..211a166 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java @@ -21,7 +21,14 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.viewfs.ConfigUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.hdfs.nfs.mount.Mountd; @@ -31,7 +38,152 @@ import org.junit.Test; public class TestExportsTable { @Test - public void testExportPoint() throws IOException { + public void testHdfsExportPoint() throws IOException { + NfsConfiguration config = new NfsConfiguration(); + MiniDFSCluster cluster = null; + + // Use emphral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + config.set("nfs.http.address", "0.0.0.0:0"); + + try { + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + cluster.waitActive(); + + // Start nfs + final Nfs3 nfsServer = new Nfs3(config); + nfsServer.startServiceInternal(false); + + Mountd mountd = nfsServer.getMountd(); + RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram(); + assertTrue(rpcMount.getExports().size() == 1); + + String exportInMountd = rpcMount.getExports().get(0); + assertTrue(exportInMountd.equals("/")); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testViewFsExportPoint() throws IOException { + NfsConfiguration config = new NfsConfiguration(); + MiniDFSCluster cluster = null; + String clusterName = RandomStringUtils.randomAlphabetic(10); + + String exportPoint = "/hdfs1,/hdfs2"; + config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint); + config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + FsConstants.VIEWFS_SCHEME + "://" + clusterName); + // Use emphral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + config.set("nfs.http.address", "0.0.0.0:0"); + + try { + cluster = + new MiniDFSCluster.Builder(config).nnTopology( + MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(2) + .build(); + cluster.waitActive(); + DistributedFileSystem hdfs1 = cluster.getFileSystem(0); + DistributedFileSystem hdfs2 = cluster.getFileSystem(1); + cluster.waitActive(); + Path base1 = new Path("/user1"); + Path base2 = new Path("/user2"); + hdfs1.delete(base1, true); + hdfs2.delete(base2, true); + hdfs1.mkdirs(base1); + hdfs2.mkdirs(base2); + ConfigUtil.addLink(config, clusterName, "/hdfs1", + hdfs1.makeQualified(base1).toUri()); + ConfigUtil.addLink(config, clusterName, "/hdfs2", + hdfs2.makeQualified(base2).toUri()); + + // Start nfs + final Nfs3 nfsServer = new Nfs3(config); + nfsServer.startServiceInternal(false); + + Mountd mountd = nfsServer.getMountd(); + RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram(); + assertTrue(rpcMount.getExports().size() == 2); + + String exportInMountd1 = rpcMount.getExports().get(0); + assertTrue(exportInMountd1.equals("/hdfs1")); + + String exportInMountd2 = rpcMount.getExports().get(1); + assertTrue(exportInMountd2.equals("/hdfs2")); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testViewFsInternalExportPoint() throws IOException { + NfsConfiguration config = new NfsConfiguration(); + MiniDFSCluster cluster = null; + String clusterName = RandomStringUtils.randomAlphabetic(10); + + String exportPoint = "/hdfs1/subpath"; + config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint); + config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + FsConstants.VIEWFS_SCHEME + "://" + clusterName); + // Use emphral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + config.set("nfs.http.address", "0.0.0.0:0"); + + try { + cluster = + new MiniDFSCluster.Builder(config).nnTopology( + MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(2) + .build(); + cluster.waitActive(); + DistributedFileSystem hdfs1 = cluster.getFileSystem(0); + DistributedFileSystem hdfs2 = cluster.getFileSystem(1); + cluster.waitActive(); + Path base1 = new Path("/user1"); + Path base2 = new Path("/user2"); + hdfs1.delete(base1, true); + hdfs2.delete(base2, true); + hdfs1.mkdirs(base1); + hdfs2.mkdirs(base2); + ConfigUtil.addLink(config, clusterName, "/hdfs1", + hdfs1.makeQualified(base1).toUri()); + ConfigUtil.addLink(config, clusterName, "/hdfs2", + hdfs2.makeQualified(base2).toUri()); + Path subPath = new Path(base1, "subpath"); + hdfs1.delete(subPath, true); + hdfs1.mkdirs(subPath); + + // Start nfs + final Nfs3 nfsServer = new Nfs3(config); + nfsServer.startServiceInternal(false); + + Mountd mountd = nfsServer.getMountd(); + RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram(); + assertTrue(rpcMount.getExports().size() == 1); + + String exportInMountd = rpcMount.getExports().get(0); + assertTrue(exportInMountd.equals(exportPoint)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testHdfsInternalExportPoint() throws IOException { NfsConfiguration config = new NfsConfiguration(); MiniDFSCluster cluster = null; @@ -40,10 +192,15 @@ public class TestExportsTable { // Use emphral port in case tests are running in parallel config.setInt("nfs3.mountd.port", 0); config.setInt("nfs3.server.port", 0); - + config.set("nfs.http.address", "0.0.0.0:0"); + Path base = new Path(exportPoint); + try { cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); + DistributedFileSystem hdfs = cluster.getFileSystem(0); + hdfs.delete(base, true); + hdfs.mkdirs(base); // Start nfs final Nfs3 nfsServer = new Nfs3(config); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6602b5f/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java index 05ba2db..0af7ced 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java @@ -21,9 +21,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.List; import org.apache.hadoop.fs.Path; @@ -31,8 +29,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; -import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3; -import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.nfs.nfs3.FileHandle; @@ -40,15 +36,10 @@ import org.apache.hadoop.nfs.nfs3.response.READDIR3Response; import org.apache.hadoop.nfs.nfs3.response.READDIR3Response.Entry3; import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response; import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response.EntryPlus3; -import org.apache.hadoop.oncrpc.RpcInfo; -import org.apache.hadoop.oncrpc.RpcMessage; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.SecurityHandler; import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; import org.apache.hadoop.security.authorize.ProxyUsers; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -119,10 +110,11 @@ public class TestReaddir { // Get inodeId of /tmp HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); // Create related part of the XDR request XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeLongAsHyper(0); // cookie xdr_req.writeLongAsHyper(0); // verifier @@ -139,7 +131,7 @@ public class TestReaddir { // Create related part of the XDR request xdr_req = new XDR(); - handle = new FileHandle(dirId); + handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeLongAsHyper(f2Id); // cookie xdr_req.writeLongAsHyper(0); // verifier @@ -167,10 +159,11 @@ public class TestReaddir { // Get inodeId of /tmp HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); // Create related part of the XDR request XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeLongAsHyper(0); // cookie xdr_req.writeLongAsHyper(0); // verifier @@ -189,7 +182,7 @@ public class TestReaddir { // Create related part of the XDR request xdr_req = new XDR(); - handle = new FileHandle(dirId); + handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeLongAsHyper(f2Id); // cookie xdr_req.writeLongAsHyper(0); // verifier --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
