umamaheswararao commented on a change in pull request #2185:
URL: https://github.com/apache/hadoop/pull/2185#discussion_r471268265



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
##########
@@ -166,6 +166,41 @@ public static void addLinkNfly(final Configuration conf, 
final String src,
     addLinkNfly(conf, getDefaultMountTableName(conf), src, null, targets);
   }
 
+
+  /**
+   * Add a LinkRegex to the config for the specified mount table.

Review comment:
       Below params javadoc does not help anything I think. If you want to add, 
please add description( most of that params seems self explanatory ), otherwise 
just remove params part.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
##########
@@ -646,102 +714,222 @@ boolean isInternalDir() {
   }
 
   /**
-   * Resolve the pathname p relative to root InodeDir
+   * Resolve the pathname p relative to root InodeDir.
    * @param p - input path
    * @param resolveLastComponent
    * @return ResolveResult which allows further resolution of the remaining 
path
    * @throws FileNotFoundException
    */
   ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
       throws FileNotFoundException {
-    String[] path = breakIntoPathComponents(p);
-    if (path.length <= 1) { // special case for when path is "/"
-      T targetFs = root.isInternalDir() ?
-          getRootDir().getInternalDirFs() : 
getRootLink().getTargetFileSystem();
-      ResolveResult<T> res = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-          targetFs, root.fullPath, SlashPath);
-      return res;
-    }
+    ResolveResult<T> resolveResult = null;
+    resolveResult = getResolveResultFromCache(p, resolveLastComponent);
+    if (resolveResult != null) {
+      return resolveResult;
+    }
+
+    try {
+      String[] path = breakIntoPathComponents(p);
+      if (path.length <= 1) { // special case for when path is "/"
+        T targetFs = root.isInternalDir() ?
+            getRootDir().getInternalDirFs()
+            : getRootLink().getTargetFileSystem();
+        resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+            targetFs, root.fullPath, SlashPath);
+        return resolveResult;
+      }
 
-    /**
-     * linkMergeSlash has been configured. The root of this mount table has
-     * been linked to the root directory of a file system.
-     * The first non-slash path component should be name of the mount table.
-     */
-    if (root.isLink()) {
-      Path remainingPath;
-      StringBuilder remainingPathStr = new StringBuilder();
-      // ignore first slash
-      for (int i = 1; i < path.length; i++) {
-        remainingPathStr.append("/").append(path[i]);
+      /**
+       * linkMergeSlash has been configured. The root of this mount table has
+       * been linked to the root directory of a file system.
+       * The first non-slash path component should be name of the mount table.
+       */
+      if (root.isLink()) {
+        Path remainingPath;
+        StringBuilder remainingPathStr = new StringBuilder();
+        // ignore first slash
+        for (int i = 1; i < path.length; i++) {
+          remainingPathStr.append("/").append(path[i]);
+        }
+        remainingPath = new Path(remainingPathStr.toString());
+        resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+            getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
+        return resolveResult;
       }
-      remainingPath = new Path(remainingPathStr.toString());
-      ResolveResult<T> res = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-          getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
-      return res;
-    }
-    Preconditions.checkState(root.isInternalDir());
-    INodeDir<T> curInode = getRootDir();
+      Preconditions.checkState(root.isInternalDir());
+      INodeDir<T> curInode = getRootDir();
 
-    int i;
-    // ignore first slash
-    for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
-      INode<T> nextInode = curInode.resolveInternal(path[i]);
-      if (nextInode == null) {
-        if (hasFallbackLink()) {
-          return new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-              getRootFallbackLink().getTargetFileSystem(),
-              root.fullPath, new Path(p));
-        } else {
-          StringBuilder failedAt = new StringBuilder(path[0]);
-          for (int j = 1; j <= i; ++j) {
-            failedAt.append('/').append(path[j]);
+      // Try to resolve path in the regex mount point
+      resolveResult = tryResolveInRegexMountpoint(p, resolveLastComponent);
+      if (resolveResult != null) {
+        return resolveResult;
+      }
+
+      int i;
+      // ignore first slash
+      for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
+        INode<T> nextInode = curInode.resolveInternal(path[i]);
+        if (nextInode == null) {
+          if (hasFallbackLink()) {
+            resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+                getRootFallbackLink().getTargetFileSystem(), root.fullPath,
+                new Path(p));
+            return resolveResult;
+          } else {
+            StringBuilder failedAt = new StringBuilder(path[0]);
+            for (int j = 1; j <= i; ++j) {
+              failedAt.append('/').append(path[j]);
+            }
+            throw (new FileNotFoundException(
+                "File/Directory does not exist: " + failedAt.toString()));
           }
-          throw (new FileNotFoundException(
-              "File/Directory does not exist: " + failedAt.toString()));
         }
-      }
 
-      if (nextInode.isLink()) {
-        final INodeLink<T> link = (INodeLink<T>) nextInode;
-        final Path remainingPath;
-        if (i >= path.length - 1) {
-          remainingPath = SlashPath;
-        } else {
-          StringBuilder remainingPathStr = new StringBuilder("/" + path[i + 
1]);
-          for (int j = i + 2; j < path.length; ++j) {
-            remainingPathStr.append('/').append(path[j]);
+        if (nextInode.isLink()) {
+          final INodeLink<T> link = (INodeLink<T>) nextInode;
+          final Path remainingPath;
+          if (i >= path.length - 1) {
+            remainingPath = SlashPath;
+          } else {
+            StringBuilder remainingPathStr =
+                new StringBuilder("/" + path[i + 1]);
+            for (int j = i + 2; j < path.length; ++j) {
+              remainingPathStr.append('/').append(path[j]);
+            }
+            remainingPath = new Path(remainingPathStr.toString());
           }
-          remainingPath = new Path(remainingPathStr.toString());
+          resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+              link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
+          return resolveResult;
+        } else if (nextInode.isInternalDir()) {
+          curInode = (INodeDir<T>) nextInode;
         }
-        final ResolveResult<T> res =
-            new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-                link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
-        return res;
-      } else if (nextInode.isInternalDir()) {
-        curInode = (INodeDir<T>) nextInode;
+      }
+
+      // We have resolved to an internal dir in mount table.
+      Path remainingPath;
+      if (resolveLastComponent) {
+        remainingPath = SlashPath;
+      } else {
+        // note we have taken care of when path is "/" above
+        // for internal dirs rem-path does not start with / since the lookup
+        // that follows will do a children.get(remaningPath) and will have to
+        // strip-out the initial /
+        StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
+        for (int j = i + 1; j < path.length; ++j) {
+          remainingPathStr.append('/').append(path[j]);
+        }
+        remainingPath = new Path(remainingPathStr.toString());
+      }
+      resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+          curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
+      return resolveResult;
+    } finally {
+      if (pathResolutionCacheCapacity > 0 && resolveResult != null) {
+        addResolveResultToCache(p, resolveLastComponent, resolveResult);
       }
     }
+  }
 
-    // We have resolved to an internal dir in mount table.
-    Path remainingPath;
-    if (resolveLastComponent) {
-      remainingPath = SlashPath;
-    } else {
-      // note we have taken care of when path is "/" above
-      // for internal dirs rem-path does not start with / since the lookup
-      // that follows will do a children.get(remaningPath) and will have to
-      // strip-out the initial /
-      StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
-      for (int j = i + 1; j < path.length; ++j) {
-        remainingPathStr.append('/').append(path[j]);
+  /**
+   * Walk through all regex mount points to see
+   * whether the path match any regex expressions.
+   *
+   * @param srcPath
+   * @param resolveLastComponent
+   * @return
+   */
+  protected ResolveResult<T> tryResolveInRegexMountpoint(final String srcPath,
+      final boolean resolveLastComponent) {
+    for (RegexMountPoint regexMountPoint : regexMountPointList) {
+      ResolveResult resolveResult =
+          regexMountPoint.resolve(srcPath, resolveLastComponent);
+      if (resolveResult != null) {
+        return resolveResult;
       }
-      remainingPath = new Path(remainingPathStr.toString());
     }
-    final ResolveResult<T> res =
-        new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-            curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
-    return res;
+    return null;
+  }
+
+  /**
+   * Build resolve result return to caller.
+   *
+   * @param resultKind
+   * @param resolvedPathStr
+   * @param targetOfResolvedPathStr
+   * @param remainingPath
+   * @return
+   */
+  protected ResolveResult<T> buildResolveResultForRegexMountPoint(
+      ResultKind resultKind, String resolvedPathStr,
+      String targetOfResolvedPathStr, Path remainingPath) {
+    try {
+      T targetFs = getTargetFileSystem(new URI(targetOfResolvedPathStr));
+      return new ResolveResult<T>(resultKind, targetFs, resolvedPathStr,
+          remainingPath);
+    } catch (IOException ex) {
+      LOGGER.error(String.format(
+          "Got Exception while build resolve result."
+              + " ResultKind:%s, resolvedPathStr:%s,"
+              + " targetOfResolvedPathStr:%s, remainingPath:%s,"
+              + " will return null.",
+          resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
+          ex);
+      return null;
+    } catch (URISyntaxException uex) {
+      LOGGER.error(String.format(
+          "Got Exception while build resolve result."
+              + " ResultKind:%s, resolvedPathStr:%s,"
+              + " targetOfResolvedPathStr:%s, remainingPath:%s,"
+              + " will return null.",
+          resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
+          uex);
+      return null;
+    }
+  }
+
+  /**
+   * Return resolution cache capacity.
+   *
+   * @return
+   */
+  public int getPathResolutionCacheCapacity() {
+    return pathResolutionCacheCapacity;
+  }
+
+  private void addResolveResultToCache(final String pathStr,
+      final Boolean resolveLastComponent,
+      final ResolveResult<T> resolveResult) {
+    try {
+      cacheRWLock.writeLock().lock();
+      String key = getResolveCacheKeyStr(pathStr, resolveLastComponent);
+      pathResolutionCache.put(key, resolveResult);
+    } finally {
+      cacheRWLock.writeLock().unlock();
+    }
+  }
+
+  private ResolveResult<T> getResolveResultFromCache(final String pathStr,
+      final Boolean resolveLastComponent) {
+    if (pathResolutionCacheCapacity <= 0) {
+      return null;
+    }
+    try {
+      cacheRWLock.readLock().lock();

Review comment:
       same as above.

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
##########
@@ -226,7 +239,14 @@ void addLink(final String pathComponent, final 
INodeLink<T> link)
      * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkNfly
      * Refer: {@link Constants#CONFIG_VIEWFS_LINK_NFLY}
      */
-    NFLY;
+    NFLY,
+    /**
+     * Link entry which source are regex exrepssions and target refer matched
+     * group from source
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMerge

Review comment:
       you mean fs.viewfs.mounttable.<mnt_tbl_name>.linkRegex ?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkRegex.java
##########
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
+import static 
org.apache.hadoop.fs.viewfs.RegexMountPoint.INTERCEPTOR_INTERNAL_SEP;
+
+/**
+ * Test linkRegex node type for view file system.
+ */
+public class TestViewFileSystemLinkRegex extends ViewFileSystemBaseTest {
+  public static final Logger LOGGER =
+      LoggerFactory.getLogger(TestViewFileSystemLinkRegex.class);
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static Configuration clusterConfig;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final FileSystem[] FS_HDFS = new 
FileSystem[NAME_SPACES_COUNT];
+  private static final String CLUSTER_NAME =
+      "TestViewFileSystemLinkRegexCluster";
+  private static final File TEST_DIR = GenericTestUtils
+      .getTestDir(TestViewFileSystemLinkRegex.class.getSimpleName());
+  private static final String TEST_BASE_PATH =
+      "/tmp/TestViewFileSystemLinkRegex";
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_BASE_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException {
+    SupportsBlocks = true;
+    clusterConfig = ViewFileSystemTestSetup.createConfig();
+    clusterConfig.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(clusterConfig).nnTopology(
+        MiniDFSNNTopology.simpleFederatedTopology(NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT).build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under 
the
+   * root of the FS, and so that we don't try to delete the test dir, but 
rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  public String buildReplaceInterceptorSettingString(String srcRegex,
+      String replaceString) {
+    return
+        
RegexMountPointInterceptorType.REPLACE_RESOLVED_DST_PATH.getConfigName()
+            + INTERCEPTOR_INTERNAL_SEP + srcRegex + INTERCEPTOR_INTERNAL_SEP
+            + replaceString;
+  }
+
+  public String linkInterceptorSettings(
+      List<String> interceptorSettingStrList) {
+    StringBuilder stringBuilder = new StringBuilder();
+    int listSize = interceptorSettingStrList.size();
+    for (int i = 0; i < listSize; ++i) {
+      stringBuilder.append(interceptorSettingStrList.get(i));
+      if (i < listSize - 1) {
+        stringBuilder.append(RegexMountPoint.INTERCEPTOR_SEP);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  @Test
+  public void testConfLinkRegexIndexMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // (^/(\w+),/targetTestRoot/$1)
+    // => /targetTestRoot/testConfLinkRegexIndexMapping1
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/(\\w+)";
+    String dstPathStr = targetTestRoot + "$1";
+    Path srcPath = new Path("/testConfLinkRegexIndexMapping1");
+    Path expectedResolveResult =
+        new Path(dstPathStr.replace("$1", "testConfLinkRegexIndexMapping1"));
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    // Test ${1} format
+    // ^/(\w+, /targetTestRoot/${1})
+    // => /targetTestRoot/testConfLinkRegexIndexMapping2
+    dstPathStr = targetTestRoot + "${1}";
+    srcPath = new Path("/testConfLinkRegexIndexMapping2");
+    expectedResolveResult =
+        new Path(dstPathStr.replace("${1}", "testConfLinkRegexIndexMapping2"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping3/file1
+    dstPathStr = targetTestRoot + "$1";
+    srcPath = new Path("/testConfLinkRegexIndexMapping3/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping3/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1/)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping4/file1
+    dstPathStr = targetTestRoot + "$1/";
+    srcPath = new Path("/testConfLinkRegexIndexMapping4/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping4/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+  }
+
+  @Test
+  public void testConfLinkRegexNamedGroupMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // ^/(?<firstDir>\\w+) = > /targetTestRoot/$firstDir
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/(?<firstDir>\\w+)";
+    String dstPathStr = targetTestRoot + "$firstDir";
+    Path srcPath = new Path("/testConfLinkRegexNamedGroupMapping1");
+    Path expectedResolveResult = new Path(
+        dstPathStr.replace("$firstDir", 
"testConfLinkRegexNamedGroupMapping1"));
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));

Review comment:
       same as above...please take care of other similar places.

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
##########
@@ -1430,4 +1432,49 @@ public void testGetContentSummaryWithFileInLocalFS() 
throws Exception {
           summaryAfter.getLength());
     }
   }
+
+  @Test
+  public void testMountPointCache() throws Exception {
+    conf.setInt(Constants.CONFIG_VIEWFS_PATH_RESOLUTION_CACHE_CAPACITY, 1);
+    conf.setBoolean("fs.viewfs.impl.disable.cache", true);

Review comment:
       Not about this test, In general on inner cache disabling: There was a 
suggestion for avoiding explicitly user disabling it. If that not possible, you 
may want to have check while analyzing mount points itself that, if cache is 
enabled and using regex mounts, then you may want to fail fs initialization 
itself? 

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
##########
@@ -646,102 +714,222 @@ boolean isInternalDir() {
   }
 
   /**
-   * Resolve the pathname p relative to root InodeDir
+   * Resolve the pathname p relative to root InodeDir.
    * @param p - input path
    * @param resolveLastComponent
    * @return ResolveResult which allows further resolution of the remaining 
path
    * @throws FileNotFoundException
    */
   ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
       throws FileNotFoundException {
-    String[] path = breakIntoPathComponents(p);
-    if (path.length <= 1) { // special case for when path is "/"
-      T targetFs = root.isInternalDir() ?
-          getRootDir().getInternalDirFs() : 
getRootLink().getTargetFileSystem();
-      ResolveResult<T> res = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-          targetFs, root.fullPath, SlashPath);
-      return res;
-    }
+    ResolveResult<T> resolveResult = null;
+    resolveResult = getResolveResultFromCache(p, resolveLastComponent);
+    if (resolveResult != null) {
+      return resolveResult;
+    }
+
+    try {
+      String[] path = breakIntoPathComponents(p);
+      if (path.length <= 1) { // special case for when path is "/"
+        T targetFs = root.isInternalDir() ?
+            getRootDir().getInternalDirFs()
+            : getRootLink().getTargetFileSystem();
+        resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+            targetFs, root.fullPath, SlashPath);
+        return resolveResult;
+      }
 
-    /**
-     * linkMergeSlash has been configured. The root of this mount table has
-     * been linked to the root directory of a file system.
-     * The first non-slash path component should be name of the mount table.
-     */
-    if (root.isLink()) {
-      Path remainingPath;
-      StringBuilder remainingPathStr = new StringBuilder();
-      // ignore first slash
-      for (int i = 1; i < path.length; i++) {
-        remainingPathStr.append("/").append(path[i]);
+      /**
+       * linkMergeSlash has been configured. The root of this mount table has
+       * been linked to the root directory of a file system.
+       * The first non-slash path component should be name of the mount table.
+       */
+      if (root.isLink()) {
+        Path remainingPath;
+        StringBuilder remainingPathStr = new StringBuilder();
+        // ignore first slash
+        for (int i = 1; i < path.length; i++) {
+          remainingPathStr.append("/").append(path[i]);
+        }
+        remainingPath = new Path(remainingPathStr.toString());
+        resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+            getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
+        return resolveResult;
       }
-      remainingPath = new Path(remainingPathStr.toString());
-      ResolveResult<T> res = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-          getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
-      return res;
-    }
-    Preconditions.checkState(root.isInternalDir());
-    INodeDir<T> curInode = getRootDir();
+      Preconditions.checkState(root.isInternalDir());
+      INodeDir<T> curInode = getRootDir();
 
-    int i;
-    // ignore first slash
-    for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
-      INode<T> nextInode = curInode.resolveInternal(path[i]);
-      if (nextInode == null) {
-        if (hasFallbackLink()) {
-          return new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-              getRootFallbackLink().getTargetFileSystem(),
-              root.fullPath, new Path(p));
-        } else {
-          StringBuilder failedAt = new StringBuilder(path[0]);
-          for (int j = 1; j <= i; ++j) {
-            failedAt.append('/').append(path[j]);
+      // Try to resolve path in the regex mount point
+      resolveResult = tryResolveInRegexMountpoint(p, resolveLastComponent);
+      if (resolveResult != null) {
+        return resolveResult;
+      }
+
+      int i;
+      // ignore first slash
+      for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
+        INode<T> nextInode = curInode.resolveInternal(path[i]);
+        if (nextInode == null) {
+          if (hasFallbackLink()) {
+            resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+                getRootFallbackLink().getTargetFileSystem(), root.fullPath,
+                new Path(p));
+            return resolveResult;
+          } else {
+            StringBuilder failedAt = new StringBuilder(path[0]);
+            for (int j = 1; j <= i; ++j) {
+              failedAt.append('/').append(path[j]);
+            }
+            throw (new FileNotFoundException(
+                "File/Directory does not exist: " + failedAt.toString()));
           }
-          throw (new FileNotFoundException(
-              "File/Directory does not exist: " + failedAt.toString()));
         }
-      }
 
-      if (nextInode.isLink()) {
-        final INodeLink<T> link = (INodeLink<T>) nextInode;
-        final Path remainingPath;
-        if (i >= path.length - 1) {
-          remainingPath = SlashPath;
-        } else {
-          StringBuilder remainingPathStr = new StringBuilder("/" + path[i + 
1]);
-          for (int j = i + 2; j < path.length; ++j) {
-            remainingPathStr.append('/').append(path[j]);
+        if (nextInode.isLink()) {
+          final INodeLink<T> link = (INodeLink<T>) nextInode;
+          final Path remainingPath;
+          if (i >= path.length - 1) {
+            remainingPath = SlashPath;
+          } else {
+            StringBuilder remainingPathStr =
+                new StringBuilder("/" + path[i + 1]);
+            for (int j = i + 2; j < path.length; ++j) {
+              remainingPathStr.append('/').append(path[j]);
+            }
+            remainingPath = new Path(remainingPathStr.toString());
           }
-          remainingPath = new Path(remainingPathStr.toString());
+          resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+              link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
+          return resolveResult;
+        } else if (nextInode.isInternalDir()) {
+          curInode = (INodeDir<T>) nextInode;
         }
-        final ResolveResult<T> res =
-            new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-                link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
-        return res;
-      } else if (nextInode.isInternalDir()) {
-        curInode = (INodeDir<T>) nextInode;
+      }
+
+      // We have resolved to an internal dir in mount table.
+      Path remainingPath;
+      if (resolveLastComponent) {
+        remainingPath = SlashPath;
+      } else {
+        // note we have taken care of when path is "/" above
+        // for internal dirs rem-path does not start with / since the lookup
+        // that follows will do a children.get(remaningPath) and will have to
+        // strip-out the initial /
+        StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
+        for (int j = i + 1; j < path.length; ++j) {
+          remainingPathStr.append('/').append(path[j]);
+        }
+        remainingPath = new Path(remainingPathStr.toString());
+      }
+      resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+          curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
+      return resolveResult;
+    } finally {
+      if (pathResolutionCacheCapacity > 0 && resolveResult != null) {
+        addResolveResultToCache(p, resolveLastComponent, resolveResult);
       }
     }
+  }
 
-    // We have resolved to an internal dir in mount table.
-    Path remainingPath;
-    if (resolveLastComponent) {
-      remainingPath = SlashPath;
-    } else {
-      // note we have taken care of when path is "/" above
-      // for internal dirs rem-path does not start with / since the lookup
-      // that follows will do a children.get(remaningPath) and will have to
-      // strip-out the initial /
-      StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
-      for (int j = i + 1; j < path.length; ++j) {
-        remainingPathStr.append('/').append(path[j]);
+  /**
+   * Walk through all regex mount points to see
+   * whether the path match any regex expressions.
+   *
+   * @param srcPath
+   * @param resolveLastComponent
+   * @return
+   */
+  protected ResolveResult<T> tryResolveInRegexMountpoint(final String srcPath,
+      final boolean resolveLastComponent) {
+    for (RegexMountPoint regexMountPoint : regexMountPointList) {
+      ResolveResult resolveResult =
+          regexMountPoint.resolve(srcPath, resolveLastComponent);
+      if (resolveResult != null) {
+        return resolveResult;
       }
-      remainingPath = new Path(remainingPathStr.toString());
     }
-    final ResolveResult<T> res =
-        new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-            curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
-    return res;
+    return null;
+  }
+
+  /**
+   * Build resolve result return to caller.
+   *
+   * @param resultKind
+   * @param resolvedPathStr
+   * @param targetOfResolvedPathStr
+   * @param remainingPath
+   * @return
+   */
+  protected ResolveResult<T> buildResolveResultForRegexMountPoint(
+      ResultKind resultKind, String resolvedPathStr,
+      String targetOfResolvedPathStr, Path remainingPath) {
+    try {
+      T targetFs = getTargetFileSystem(new URI(targetOfResolvedPathStr));
+      return new ResolveResult<T>(resultKind, targetFs, resolvedPathStr,
+          remainingPath);
+    } catch (IOException ex) {
+      LOGGER.error(String.format(
+          "Got Exception while build resolve result."
+              + " ResultKind:%s, resolvedPathStr:%s,"
+              + " targetOfResolvedPathStr:%s, remainingPath:%s,"
+              + " will return null.",
+          resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
+          ex);
+      return null;
+    } catch (URISyntaxException uex) {
+      LOGGER.error(String.format(
+          "Got Exception while build resolve result."
+              + " ResultKind:%s, resolvedPathStr:%s,"
+              + " targetOfResolvedPathStr:%s, remainingPath:%s,"
+              + " will return null.",
+          resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
+          uex);
+      return null;
+    }
+  }
+
+  /**
+   * Return resolution cache capacity.
+   *
+   * @return
+   */
+  public int getPathResolutionCacheCapacity() {
+    return pathResolutionCacheCapacity;
+  }
+

Review comment:
       Since this patch already big and this cache improvement seems to be 
generic for all cases, can we file separate Sub JIRA for this cache part? 

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/RegexMountPoint.java
##########
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.viewfs.InodeTree.SlashPath;
+
+/**
+ * Regex mount point is build to implement regex based mount point.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class RegexMountPoint<T> {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(RegexMountPoint.class.getName());
+
+  private InodeTree inodeTree;
+  private String srcPathRegex;
+  private Pattern srcPattern;
+  private String dstPath;
+  private String interceptorSettingsString;
+  private List<RegexMountPointInterceptor> interceptorList;
+
+  public static final String SETTING_SRCREGEX_SEP = "#.";
+  public static final char INTERCEPTOR_SEP = ';';
+  public static final char INTERCEPTOR_INTERNAL_SEP = ':';
+  // ${var},$var
+  public static final Pattern VAR_PATTERN_IN_DEST =
+      Pattern.compile("\\$((\\{\\w+\\})|(\\w+))");
+
+  // key => $key or key = > ${key}
+  private Map<String, Set<String>> varInDestPathMap;
+
+  public Map<String, Set<String>> getVarInDestPathMap() {
+    return varInDestPathMap;
+  }
+
+  RegexMountPoint(InodeTree inodeTree, String sourcePathRegex,
+      String destPath, String settingsStr) {
+    this.inodeTree = inodeTree;
+    this.srcPathRegex = sourcePathRegex;
+    this.dstPath = destPath;
+    this.interceptorSettingsString = settingsStr;
+    this.interceptorList = new ArrayList<>();
+  }
+
+  /**
+   * Initialize regex mount point.
+   *
+   * @throws IOException
+   */
+  public void initialize() throws IOException {
+    try {
+      srcPattern = Pattern.compile(srcPathRegex);
+    } catch (PatternSyntaxException ex) {
+      throw new IOException(
+          "Failed to initialized mount point due to bad src path regex:"
+              + srcPathRegex + ", dstPath:" + dstPath, ex);
+    }
+    varInDestPathMap = getVarListInString(dstPath);
+    initializeInterceptors();
+  }
+
+  private void initializeInterceptors() throws IOException {
+    if (interceptorSettingsString == null
+        || interceptorSettingsString.isEmpty()) {
+      return;
+    }
+    String[] interceptorStrArray =
+        StringUtils.split(interceptorSettingsString, INTERCEPTOR_SEP);
+    for (String interceptorStr : interceptorStrArray) {
+      RegexMountPointInterceptor interceptor =
+          RegexMountPointInterceptorFactory.create(interceptorStr);
+      if (interceptor == null) {
+        throw new IOException(
+            "Illegal settings String " + interceptorSettingsString);
+      }
+      interceptor.initialize();
+      interceptorList.add(interceptor);
+    }
+  }
+
+  /**
+   * Get $var1 and $var2 style variables in string.
+   *
+   * @param input
+   * @return
+   */
+  public static Map<String, Set<String>> getVarListInString(String input) {
+    Map<String, Set<String>> varMap = new HashMap<>();
+    Matcher matcher = VAR_PATTERN_IN_DEST.matcher(input);
+    while (matcher.find()) {
+      // $var or ${var}
+      String varName = matcher.group(0);
+      // var or {var}
+      String strippedVarName = matcher.group(1);
+      if (strippedVarName.startsWith("{")) {
+        // {varName} = > varName
+        strippedVarName =
+            strippedVarName.substring(1, strippedVarName.length() - 1);
+      }
+      varMap.putIfAbsent(strippedVarName, new HashSet<>());
+      varMap.get(strippedVarName).add(varName);
+    }
+    return varMap;
+  }
+
+  public String getSrcPathRegex() {
+    return srcPathRegex;
+  }
+
+  public Pattern getSrcPattern() {
+    return srcPattern;
+  }
+
+  public String getDstPath() {
+    return dstPath;
+  }
+
+  public static Pattern getVarPatternInDest() {
+    return VAR_PATTERN_IN_DEST;
+  }
+
+  /**
+   * Get resolved path from regex mount points.
+   * @param srcPath
+   * @param resolveLastComponent
+   * @return
+   */
+  public InodeTree.ResolveResult<T> resolve(final String srcPath,
+      final boolean resolveLastComponent) {
+    String pathStrToResolve = srcPath;
+    if (!resolveLastComponent) {
+      int lastSlashIndex = srcPath.lastIndexOf(SlashPath.toString());
+      if (lastSlashIndex == -1) {
+        return null;
+      }
+      pathStrToResolve = srcPath.substring(0, lastSlashIndex);
+    }
+    for (RegexMountPointInterceptor interceptor : interceptorList) {
+      pathStrToResolve = interceptor.interceptSource(pathStrToResolve);
+    }

Review comment:
       nit: typo: give space after comma

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
##########
@@ -646,102 +714,222 @@ boolean isInternalDir() {
   }
 
   /**
-   * Resolve the pathname p relative to root InodeDir
+   * Resolve the pathname p relative to root InodeDir.
    * @param p - input path
    * @param resolveLastComponent
    * @return ResolveResult which allows further resolution of the remaining 
path
    * @throws FileNotFoundException
    */
   ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
       throws FileNotFoundException {
-    String[] path = breakIntoPathComponents(p);
-    if (path.length <= 1) { // special case for when path is "/"
-      T targetFs = root.isInternalDir() ?
-          getRootDir().getInternalDirFs() : 
getRootLink().getTargetFileSystem();
-      ResolveResult<T> res = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-          targetFs, root.fullPath, SlashPath);
-      return res;
-    }
+    ResolveResult<T> resolveResult = null;
+    resolveResult = getResolveResultFromCache(p, resolveLastComponent);
+    if (resolveResult != null) {
+      return resolveResult;
+    }
+
+    try {
+      String[] path = breakIntoPathComponents(p);
+      if (path.length <= 1) { // special case for when path is "/"
+        T targetFs = root.isInternalDir() ?
+            getRootDir().getInternalDirFs()
+            : getRootLink().getTargetFileSystem();
+        resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+            targetFs, root.fullPath, SlashPath);
+        return resolveResult;
+      }
 
-    /**
-     * linkMergeSlash has been configured. The root of this mount table has
-     * been linked to the root directory of a file system.
-     * The first non-slash path component should be name of the mount table.
-     */
-    if (root.isLink()) {
-      Path remainingPath;
-      StringBuilder remainingPathStr = new StringBuilder();
-      // ignore first slash
-      for (int i = 1; i < path.length; i++) {
-        remainingPathStr.append("/").append(path[i]);
+      /**
+       * linkMergeSlash has been configured. The root of this mount table has
+       * been linked to the root directory of a file system.
+       * The first non-slash path component should be name of the mount table.
+       */
+      if (root.isLink()) {
+        Path remainingPath;
+        StringBuilder remainingPathStr = new StringBuilder();
+        // ignore first slash
+        for (int i = 1; i < path.length; i++) {
+          remainingPathStr.append("/").append(path[i]);
+        }
+        remainingPath = new Path(remainingPathStr.toString());
+        resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+            getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
+        return resolveResult;
       }
-      remainingPath = new Path(remainingPathStr.toString());
-      ResolveResult<T> res = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-          getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
-      return res;
-    }
-    Preconditions.checkState(root.isInternalDir());
-    INodeDir<T> curInode = getRootDir();
+      Preconditions.checkState(root.isInternalDir());
+      INodeDir<T> curInode = getRootDir();
 
-    int i;
-    // ignore first slash
-    for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
-      INode<T> nextInode = curInode.resolveInternal(path[i]);
-      if (nextInode == null) {
-        if (hasFallbackLink()) {
-          return new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-              getRootFallbackLink().getTargetFileSystem(),
-              root.fullPath, new Path(p));
-        } else {
-          StringBuilder failedAt = new StringBuilder(path[0]);
-          for (int j = 1; j <= i; ++j) {
-            failedAt.append('/').append(path[j]);
+      // Try to resolve path in the regex mount point
+      resolveResult = tryResolveInRegexMountpoint(p, resolveLastComponent);
+      if (resolveResult != null) {
+        return resolveResult;
+      }
+
+      int i;
+      // ignore first slash
+      for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
+        INode<T> nextInode = curInode.resolveInternal(path[i]);
+        if (nextInode == null) {
+          if (hasFallbackLink()) {
+            resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+                getRootFallbackLink().getTargetFileSystem(), root.fullPath,
+                new Path(p));
+            return resolveResult;
+          } else {
+            StringBuilder failedAt = new StringBuilder(path[0]);
+            for (int j = 1; j <= i; ++j) {
+              failedAt.append('/').append(path[j]);
+            }
+            throw (new FileNotFoundException(
+                "File/Directory does not exist: " + failedAt.toString()));
           }
-          throw (new FileNotFoundException(
-              "File/Directory does not exist: " + failedAt.toString()));
         }
-      }
 
-      if (nextInode.isLink()) {
-        final INodeLink<T> link = (INodeLink<T>) nextInode;
-        final Path remainingPath;
-        if (i >= path.length - 1) {
-          remainingPath = SlashPath;
-        } else {
-          StringBuilder remainingPathStr = new StringBuilder("/" + path[i + 
1]);
-          for (int j = i + 2; j < path.length; ++j) {
-            remainingPathStr.append('/').append(path[j]);
+        if (nextInode.isLink()) {
+          final INodeLink<T> link = (INodeLink<T>) nextInode;
+          final Path remainingPath;
+          if (i >= path.length - 1) {
+            remainingPath = SlashPath;
+          } else {
+            StringBuilder remainingPathStr =
+                new StringBuilder("/" + path[i + 1]);
+            for (int j = i + 2; j < path.length; ++j) {
+              remainingPathStr.append('/').append(path[j]);
+            }
+            remainingPath = new Path(remainingPathStr.toString());
           }
-          remainingPath = new Path(remainingPathStr.toString());
+          resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+              link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
+          return resolveResult;
+        } else if (nextInode.isInternalDir()) {
+          curInode = (INodeDir<T>) nextInode;
         }
-        final ResolveResult<T> res =
-            new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-                link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
-        return res;
-      } else if (nextInode.isInternalDir()) {
-        curInode = (INodeDir<T>) nextInode;
+      }
+
+      // We have resolved to an internal dir in mount table.
+      Path remainingPath;
+      if (resolveLastComponent) {
+        remainingPath = SlashPath;
+      } else {
+        // note we have taken care of when path is "/" above
+        // for internal dirs rem-path does not start with / since the lookup
+        // that follows will do a children.get(remaningPath) and will have to
+        // strip-out the initial /
+        StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
+        for (int j = i + 1; j < path.length; ++j) {
+          remainingPathStr.append('/').append(path[j]);
+        }
+        remainingPath = new Path(remainingPathStr.toString());
+      }
+      resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+          curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
+      return resolveResult;
+    } finally {
+      if (pathResolutionCacheCapacity > 0 && resolveResult != null) {
+        addResolveResultToCache(p, resolveLastComponent, resolveResult);
       }
     }
+  }
 
-    // We have resolved to an internal dir in mount table.
-    Path remainingPath;
-    if (resolveLastComponent) {
-      remainingPath = SlashPath;
-    } else {
-      // note we have taken care of when path is "/" above
-      // for internal dirs rem-path does not start with / since the lookup
-      // that follows will do a children.get(remaningPath) and will have to
-      // strip-out the initial /
-      StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
-      for (int j = i + 1; j < path.length; ++j) {
-        remainingPathStr.append('/').append(path[j]);
+  /**
+   * Walk through all regex mount points to see
+   * whether the path match any regex expressions.
+   *
+   * @param srcPath
+   * @param resolveLastComponent
+   * @return
+   */
+  protected ResolveResult<T> tryResolveInRegexMountpoint(final String srcPath,
+      final boolean resolveLastComponent) {
+    for (RegexMountPoint regexMountPoint : regexMountPointList) {
+      ResolveResult resolveResult =
+          regexMountPoint.resolve(srcPath, resolveLastComponent);
+      if (resolveResult != null) {
+        return resolveResult;
       }
-      remainingPath = new Path(remainingPathStr.toString());
     }
-    final ResolveResult<T> res =
-        new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-            curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
-    return res;
+    return null;
+  }
+
+  /**
+   * Build resolve result return to caller.
+   *
+   * @param resultKind
+   * @param resolvedPathStr
+   * @param targetOfResolvedPathStr
+   * @param remainingPath
+   * @return
+   */
+  protected ResolveResult<T> buildResolveResultForRegexMountPoint(
+      ResultKind resultKind, String resolvedPathStr,
+      String targetOfResolvedPathStr, Path remainingPath) {
+    try {
+      T targetFs = getTargetFileSystem(new URI(targetOfResolvedPathStr));
+      return new ResolveResult<T>(resultKind, targetFs, resolvedPathStr,
+          remainingPath);
+    } catch (IOException ex) {
+      LOGGER.error(String.format(
+          "Got Exception while build resolve result."
+              + " ResultKind:%s, resolvedPathStr:%s,"
+              + " targetOfResolvedPathStr:%s, remainingPath:%s,"
+              + " will return null.",
+          resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
+          ex);
+      return null;
+    } catch (URISyntaxException uex) {
+      LOGGER.error(String.format(
+          "Got Exception while build resolve result."
+              + " ResultKind:%s, resolvedPathStr:%s,"
+              + " targetOfResolvedPathStr:%s, remainingPath:%s,"
+              + " will return null.",
+          resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
+          uex);
+      return null;
+    }
+  }
+
+  /**
+   * Return resolution cache capacity.
+   *
+   * @return
+   */
+  public int getPathResolutionCacheCapacity() {
+    return pathResolutionCacheCapacity;
+  }
+
+  private void addResolveResultToCache(final String pathStr,
+      final Boolean resolveLastComponent,
+      final ResolveResult<T> resolveResult) {
+    try {
+      cacheRWLock.writeLock().lock();
+      String key = getResolveCacheKeyStr(pathStr, resolveLastComponent);
+      pathResolutionCache.put(key, resolveResult);
+    } finally {
+      cacheRWLock.writeLock().unlock();
+    }
+  }
+
+  private ResolveResult<T> getResolveResultFromCache(final String pathStr,
+      final Boolean resolveLastComponent) {
+    if (pathResolutionCacheCapacity <= 0) {
+      return null;
+    }
+    try {
+      cacheRWLock.readLock().lock();
+      String key = getResolveCacheKeyStr(pathStr, resolveLastComponent);
+      return (ResolveResult<T>) pathResolutionCache.get(key);
+    } finally {
+      cacheRWLock.readLock().unlock();
+    }
+  }
+
+  public static String getResolveCacheKeyStr(final String path,
+      Boolean resolveLastComp) {
+    return path + ",resolveLastComp" + resolveLastComp;
+  }
+

Review comment:
       Please move annotation to separate line below.

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
##########
@@ -1417,7 +1418,8 @@ public void testGetContentSummaryWithFileInLocalFS() 
throws Exception {
       fos.write(expected.getBytes());
     }
     ConfigUtil.addLink(conf,
-        "/internalDir/internalDir2/linkToLocalFile", localFile.toURI());

Review comment:
       seems like no change here. Can we avoid it?

##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
##########
@@ -646,102 +714,222 @@ boolean isInternalDir() {
   }
 
   /**
-   * Resolve the pathname p relative to root InodeDir
+   * Resolve the pathname p relative to root InodeDir.
    * @param p - input path
    * @param resolveLastComponent
    * @return ResolveResult which allows further resolution of the remaining 
path
    * @throws FileNotFoundException
    */
   ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
       throws FileNotFoundException {
-    String[] path = breakIntoPathComponents(p);
-    if (path.length <= 1) { // special case for when path is "/"
-      T targetFs = root.isInternalDir() ?
-          getRootDir().getInternalDirFs() : 
getRootLink().getTargetFileSystem();
-      ResolveResult<T> res = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-          targetFs, root.fullPath, SlashPath);
-      return res;
-    }
+    ResolveResult<T> resolveResult = null;
+    resolveResult = getResolveResultFromCache(p, resolveLastComponent);
+    if (resolveResult != null) {
+      return resolveResult;
+    }
+
+    try {
+      String[] path = breakIntoPathComponents(p);
+      if (path.length <= 1) { // special case for when path is "/"
+        T targetFs = root.isInternalDir() ?
+            getRootDir().getInternalDirFs()
+            : getRootLink().getTargetFileSystem();
+        resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+            targetFs, root.fullPath, SlashPath);
+        return resolveResult;
+      }
 
-    /**
-     * linkMergeSlash has been configured. The root of this mount table has
-     * been linked to the root directory of a file system.
-     * The first non-slash path component should be name of the mount table.
-     */
-    if (root.isLink()) {
-      Path remainingPath;
-      StringBuilder remainingPathStr = new StringBuilder();
-      // ignore first slash
-      for (int i = 1; i < path.length; i++) {
-        remainingPathStr.append("/").append(path[i]);
+      /**
+       * linkMergeSlash has been configured. The root of this mount table has
+       * been linked to the root directory of a file system.
+       * The first non-slash path component should be name of the mount table.
+       */
+      if (root.isLink()) {
+        Path remainingPath;
+        StringBuilder remainingPathStr = new StringBuilder();
+        // ignore first slash
+        for (int i = 1; i < path.length; i++) {
+          remainingPathStr.append("/").append(path[i]);
+        }
+        remainingPath = new Path(remainingPathStr.toString());
+        resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+            getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
+        return resolveResult;
       }
-      remainingPath = new Path(remainingPathStr.toString());
-      ResolveResult<T> res = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-          getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
-      return res;
-    }
-    Preconditions.checkState(root.isInternalDir());
-    INodeDir<T> curInode = getRootDir();
+      Preconditions.checkState(root.isInternalDir());
+      INodeDir<T> curInode = getRootDir();
 
-    int i;
-    // ignore first slash
-    for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
-      INode<T> nextInode = curInode.resolveInternal(path[i]);
-      if (nextInode == null) {
-        if (hasFallbackLink()) {
-          return new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-              getRootFallbackLink().getTargetFileSystem(),
-              root.fullPath, new Path(p));
-        } else {
-          StringBuilder failedAt = new StringBuilder(path[0]);
-          for (int j = 1; j <= i; ++j) {
-            failedAt.append('/').append(path[j]);
+      // Try to resolve path in the regex mount point
+      resolveResult = tryResolveInRegexMountpoint(p, resolveLastComponent);
+      if (resolveResult != null) {
+        return resolveResult;
+      }
+
+      int i;
+      // ignore first slash
+      for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
+        INode<T> nextInode = curInode.resolveInternal(path[i]);
+        if (nextInode == null) {
+          if (hasFallbackLink()) {
+            resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+                getRootFallbackLink().getTargetFileSystem(), root.fullPath,
+                new Path(p));
+            return resolveResult;
+          } else {
+            StringBuilder failedAt = new StringBuilder(path[0]);
+            for (int j = 1; j <= i; ++j) {
+              failedAt.append('/').append(path[j]);
+            }
+            throw (new FileNotFoundException(
+                "File/Directory does not exist: " + failedAt.toString()));
           }
-          throw (new FileNotFoundException(
-              "File/Directory does not exist: " + failedAt.toString()));
         }
-      }
 
-      if (nextInode.isLink()) {
-        final INodeLink<T> link = (INodeLink<T>) nextInode;
-        final Path remainingPath;
-        if (i >= path.length - 1) {
-          remainingPath = SlashPath;
-        } else {
-          StringBuilder remainingPathStr = new StringBuilder("/" + path[i + 
1]);
-          for (int j = i + 2; j < path.length; ++j) {
-            remainingPathStr.append('/').append(path[j]);
+        if (nextInode.isLink()) {
+          final INodeLink<T> link = (INodeLink<T>) nextInode;
+          final Path remainingPath;
+          if (i >= path.length - 1) {
+            remainingPath = SlashPath;
+          } else {
+            StringBuilder remainingPathStr =
+                new StringBuilder("/" + path[i + 1]);
+            for (int j = i + 2; j < path.length; ++j) {
+              remainingPathStr.append('/').append(path[j]);
+            }
+            remainingPath = new Path(remainingPathStr.toString());
           }
-          remainingPath = new Path(remainingPathStr.toString());
+          resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+              link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
+          return resolveResult;
+        } else if (nextInode.isInternalDir()) {
+          curInode = (INodeDir<T>) nextInode;
         }
-        final ResolveResult<T> res =
-            new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
-                link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
-        return res;
-      } else if (nextInode.isInternalDir()) {
-        curInode = (INodeDir<T>) nextInode;
+      }
+
+      // We have resolved to an internal dir in mount table.
+      Path remainingPath;
+      if (resolveLastComponent) {
+        remainingPath = SlashPath;
+      } else {
+        // note we have taken care of when path is "/" above
+        // for internal dirs rem-path does not start with / since the lookup
+        // that follows will do a children.get(remaningPath) and will have to
+        // strip-out the initial /
+        StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
+        for (int j = i + 1; j < path.length; ++j) {
+          remainingPathStr.append('/').append(path[j]);
+        }
+        remainingPath = new Path(remainingPathStr.toString());
+      }
+      resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+          curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
+      return resolveResult;
+    } finally {
+      if (pathResolutionCacheCapacity > 0 && resolveResult != null) {
+        addResolveResultToCache(p, resolveLastComponent, resolveResult);
       }
     }
+  }
 
-    // We have resolved to an internal dir in mount table.
-    Path remainingPath;
-    if (resolveLastComponent) {
-      remainingPath = SlashPath;
-    } else {
-      // note we have taken care of when path is "/" above
-      // for internal dirs rem-path does not start with / since the lookup
-      // that follows will do a children.get(remaningPath) and will have to
-      // strip-out the initial /
-      StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
-      for (int j = i + 1; j < path.length; ++j) {
-        remainingPathStr.append('/').append(path[j]);
+  /**
+   * Walk through all regex mount points to see
+   * whether the path match any regex expressions.
+   *
+   * @param srcPath
+   * @param resolveLastComponent
+   * @return
+   */
+  protected ResolveResult<T> tryResolveInRegexMountpoint(final String srcPath,
+      final boolean resolveLastComponent) {
+    for (RegexMountPoint regexMountPoint : regexMountPointList) {
+      ResolveResult resolveResult =
+          regexMountPoint.resolve(srcPath, resolveLastComponent);
+      if (resolveResult != null) {
+        return resolveResult;
       }
-      remainingPath = new Path(remainingPathStr.toString());
     }
-    final ResolveResult<T> res =
-        new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-            curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
-    return res;
+    return null;
+  }
+
+  /**
+   * Build resolve result return to caller.
+   *
+   * @param resultKind
+   * @param resolvedPathStr
+   * @param targetOfResolvedPathStr
+   * @param remainingPath
+   * @return
+   */
+  protected ResolveResult<T> buildResolveResultForRegexMountPoint(
+      ResultKind resultKind, String resolvedPathStr,
+      String targetOfResolvedPathStr, Path remainingPath) {
+    try {
+      T targetFs = getTargetFileSystem(new URI(targetOfResolvedPathStr));
+      return new ResolveResult<T>(resultKind, targetFs, resolvedPathStr,
+          remainingPath);
+    } catch (IOException ex) {
+      LOGGER.error(String.format(
+          "Got Exception while build resolve result."
+              + " ResultKind:%s, resolvedPathStr:%s,"
+              + " targetOfResolvedPathStr:%s, remainingPath:%s,"
+              + " will return null.",
+          resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
+          ex);
+      return null;
+    } catch (URISyntaxException uex) {
+      LOGGER.error(String.format(
+          "Got Exception while build resolve result."
+              + " ResultKind:%s, resolvedPathStr:%s,"
+              + " targetOfResolvedPathStr:%s, remainingPath:%s,"
+              + " will return null.",
+          resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
+          uex);
+      return null;
+    }
+  }
+
+  /**
+   * Return resolution cache capacity.
+   *
+   * @return
+   */
+  public int getPathResolutionCacheCapacity() {
+    return pathResolutionCacheCapacity;
+  }
+
+  private void addResolveResultToCache(final String pathStr,
+      final Boolean resolveLastComponent,
+      final ResolveResult<T> resolveResult) {
+    try {
+      cacheRWLock.writeLock().lock();

Review comment:
       below key construction can be outside of lock?

##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
##########
@@ -366,6 +366,82 @@ Don't want to change scheme or difficult to copy 
mount-table configurations to a
 
 Please refer to the [View File System Overload Scheme 
Guide](./ViewFsOverloadScheme.html)
 
+Regex Pattern Based Mount Points
+--------------------------------
+
+The view file system mount points were a Key-Value based mapping system. It is 
not friendly for user cases which mapping config could be abstracted to rules. 
E.g. Users want to provide a GCS bucket per user and there might be thousands 
of users in total. The old key-value based approach won't work well for several 
reasons:
+
+1. The mount table is used by FileSystem clients. There's a cost to spread the 
config to all clients and we should avoid it if possible. The [View File System 
Overload Scheme Guide](./ViewFsOverloadScheme.html) could help the distribution 
by central mount table management. But the mount table still have to be updated 
on every change. The change could be greatly avoided if provide a rule-based 
mount table..
+
+2. The client have to understand all the KVs in the mount table. This is not 
ideal when the mountable grows to thousands of items. E.g. thousands of file 
systems might be initialized even users only need one. And the config itself 
will become bloated at scale.

Review comment:
       I agree with this issue and filed a JIRA for it. 
https://issues.apache.org/jira/browse/HADOOP-17028
   This issue should be solved once implement that.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkRegex.java
##########
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
+import static 
org.apache.hadoop.fs.viewfs.RegexMountPoint.INTERCEPTOR_INTERNAL_SEP;
+
+/**
+ * Test linkRegex node type for view file system.
+ */
+public class TestViewFileSystemLinkRegex extends ViewFileSystemBaseTest {
+  public static final Logger LOGGER =
+      LoggerFactory.getLogger(TestViewFileSystemLinkRegex.class);
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static Configuration clusterConfig;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final FileSystem[] FS_HDFS = new 
FileSystem[NAME_SPACES_COUNT];
+  private static final String CLUSTER_NAME =
+      "TestViewFileSystemLinkRegexCluster";
+  private static final File TEST_DIR = GenericTestUtils
+      .getTestDir(TestViewFileSystemLinkRegex.class.getSimpleName());
+  private static final String TEST_BASE_PATH =
+      "/tmp/TestViewFileSystemLinkRegex";
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_BASE_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException {
+    SupportsBlocks = true;
+    clusterConfig = ViewFileSystemTestSetup.createConfig();
+    clusterConfig.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(clusterConfig).nnTopology(
+        MiniDFSNNTopology.simpleFederatedTopology(NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT).build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under 
the
+   * root of the FS, and so that we don't try to delete the test dir, but 
rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  public String buildReplaceInterceptorSettingString(String srcRegex,
+      String replaceString) {
+    return
+        
RegexMountPointInterceptorType.REPLACE_RESOLVED_DST_PATH.getConfigName()
+            + INTERCEPTOR_INTERNAL_SEP + srcRegex + INTERCEPTOR_INTERNAL_SEP
+            + replaceString;
+  }
+
+  public String linkInterceptorSettings(
+      List<String> interceptorSettingStrList) {
+    StringBuilder stringBuilder = new StringBuilder();
+    int listSize = interceptorSettingStrList.size();
+    for (int i = 0; i < listSize; ++i) {
+      stringBuilder.append(interceptorSettingStrList.get(i));
+      if (i < listSize - 1) {
+        stringBuilder.append(RegexMountPoint.INTERCEPTOR_SEP);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  @Test
+  public void testConfLinkRegexIndexMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // (^/(\w+),/targetTestRoot/$1)
+    // => /targetTestRoot/testConfLinkRegexIndexMapping1
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/(\\w+)";
+    String dstPathStr = targetTestRoot + "$1";
+    Path srcPath = new Path("/testConfLinkRegexIndexMapping1");
+    Path expectedResolveResult =
+        new Path(dstPathStr.replace("$1", "testConfLinkRegexIndexMapping1"));
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    // Test ${1} format
+    // ^/(\w+, /targetTestRoot/${1})
+    // => /targetTestRoot/testConfLinkRegexIndexMapping2
+    dstPathStr = targetTestRoot + "${1}";
+    srcPath = new Path("/testConfLinkRegexIndexMapping2");
+    expectedResolveResult =
+        new Path(dstPathStr.replace("${1}", "testConfLinkRegexIndexMapping2"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping3/file1
+    dstPathStr = targetTestRoot + "$1";
+    srcPath = new Path("/testConfLinkRegexIndexMapping3/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping3/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);

Review comment:
       same as above comment

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestRegexMountPointResolvedDstPathReplaceInterceptor.java
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static 
org.apache.hadoop.fs.viewfs.RegexMountPointInterceptorType.REPLACE_RESOLVED_DST_PATH;
+
+/**
+ * Test RegexMountPointResolvedDstPathReplaceInterceptor.
+ */
+public class TestRegexMountPointResolvedDstPathReplaceInterceptor {
+
+  public String createSerializedString(String regex, String replaceString) {
+    return REPLACE_RESOLVED_DST_PATH.getConfigName()
+        + RegexMountPoint.INTERCEPTOR_INTERNAL_SEP + regex
+        + RegexMountPoint.INTERCEPTOR_INTERNAL_SEP + replaceString;
+  }
+
+  @Test public void testDeserializeFromStringNormalCase() throws IOException {
+    String srcRegex = "-";
+    String replaceString = "_";
+    String serializedString = createSerializedString(srcRegex, replaceString);
+    RegexMountPointResolvedDstPathReplaceInterceptor interceptor =
+        RegexMountPointResolvedDstPathReplaceInterceptor
+            .deserializeFromString(serializedString);
+    Assert.assertTrue(interceptor.getSrcRegexString().equals(srcRegex));
+    Assert.assertTrue(interceptor.getReplaceString().equals(replaceString));
+    Assert.assertTrue(interceptor.getSrcRegexPattern() == null);
+    interceptor.initialize();
+    Assert.assertTrue(
+        interceptor.getSrcRegexPattern().toString().equals(srcRegex));
+  }
+
+  @Test public void testDeserializeFromStringBadCase() throws IOException {
+    String srcRegex = "-";
+    String replaceString = "_";
+    String serializedString = createSerializedString(srcRegex, replaceString);
+    serializedString = serializedString + ":ddd";
+    RegexMountPointResolvedDstPathReplaceInterceptor interceptor =
+        RegexMountPointResolvedDstPathReplaceInterceptor
+            .deserializeFromString(serializedString);
+    Assert.assertEquals(interceptor, null);
+  }
+
+  @Test public void testSerialization() {
+    String srcRegex = "word1";
+    String replaceString = "word2";
+    String serializedString = createSerializedString(srcRegex, replaceString);
+    RegexMountPointResolvedDstPathReplaceInterceptor interceptor =
+        new RegexMountPointResolvedDstPathReplaceInterceptor(srcRegex,
+            replaceString);
+    Assert.assertEquals(interceptor.serializeToString(), serializedString);
+  }
+
+  @Test public void testInterceptSource() {

Review comment:
       Nits: please add annotation in separate line above. You may want to 
change for all such lines below and above.

##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
##########
@@ -366,6 +366,82 @@ Don't want to change scheme or difficult to copy 
mount-table configurations to a
 
 Please refer to the [View File System Overload Scheme 
Guide](./ViewFsOverloadScheme.html)
 
+Regex Pattern Based Mount Points
+--------------------------------
+
+The view file system mount points were a Key-Value based mapping system. It is 
not friendly for user cases which mapping config could be abstracted to rules. 
E.g. Users want to provide a GCS bucket per user and there might be thousands 
of users in total. The old key-value based approach won't work well for several 
reasons:
+
+1. The mount table is used by FileSystem clients. There's a cost to spread the 
config to all clients and we should avoid it if possible. The [View File System 
Overload Scheme Guide](./ViewFsOverloadScheme.html) could help the distribution 
by central mount table management. But the mount table still have to be updated 
on every change. The change could be greatly avoided if provide a rule-based 
mount table..
+
+2. The client have to understand all the KVs in the mount table. This is not 
ideal when the mountable grows to thousands of items. E.g. thousands of file 
systems might be initialized even users only need one. And the config itself 
will become bloated at scale.
+
+### Understand the Difference
+
+In the key-value based mount table, view file system treats every mount point 
as a partition. There's several file system APIs which will lead to operation 
on all partitions. E.g. there's an HDFS cluster with multiple mount. Users want 
to run “hadoop fs -put file viewfs://hdfs.namenode.apache.org/tmp/” cmd to copy 
data from local disk to our HDFS cluster. The cmd will trigger ViewFileSystem 
to call setVerifyChecksum() method which will initialize the file system for 
every mount point.
+For a regex-base rule mount table entry, we couldn't know what's corresponding 
path until parsing. So the regex based mount table entry will be ignored on 
such cases and the file system will be created upon accessing. The inner cache 
of ViewFs is also not available for regex-base mount points now as it assumes 
target file system doesn't change after viewfs initialization. Please disable 
it if you want to use regex-base mount table. We also need to change the rename 
strategy to SAME_FILESYSTEM_ACROSS_MOUNTPOINT for the same reason.
+```xml
+<property>
+    <name>fs.viewfs.enable.inner.cache</name>
+    <value>false</value>

Review comment:
       Instead of asking users to disable, isn't it the better way is to add 
getTargetFileSystem(uri, userCache=true/false) method?
   So, that the users already using other mount points need not disable for the 
sake if regex based mount points right?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkRegex.java
##########
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
+import static 
org.apache.hadoop.fs.viewfs.RegexMountPoint.INTERCEPTOR_INTERNAL_SEP;
+
+/**
+ * Test linkRegex node type for view file system.
+ */
+public class TestViewFileSystemLinkRegex extends ViewFileSystemBaseTest {
+  public static final Logger LOGGER =
+      LoggerFactory.getLogger(TestViewFileSystemLinkRegex.class);
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static Configuration clusterConfig;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final FileSystem[] FS_HDFS = new 
FileSystem[NAME_SPACES_COUNT];
+  private static final String CLUSTER_NAME =
+      "TestViewFileSystemLinkRegexCluster";
+  private static final File TEST_DIR = GenericTestUtils
+      .getTestDir(TestViewFileSystemLinkRegex.class.getSimpleName());
+  private static final String TEST_BASE_PATH =
+      "/tmp/TestViewFileSystemLinkRegex";
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_BASE_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException {
+    SupportsBlocks = true;
+    clusterConfig = ViewFileSystemTestSetup.createConfig();
+    clusterConfig.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(clusterConfig).nnTopology(
+        MiniDFSNNTopology.simpleFederatedTopology(NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT).build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under 
the
+   * root of the FS, and so that we don't try to delete the test dir, but 
rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  public String buildReplaceInterceptorSettingString(String srcRegex,
+      String replaceString) {
+    return
+        
RegexMountPointInterceptorType.REPLACE_RESOLVED_DST_PATH.getConfigName()
+            + INTERCEPTOR_INTERNAL_SEP + srcRegex + INTERCEPTOR_INTERNAL_SEP
+            + replaceString;
+  }
+
+  public String linkInterceptorSettings(
+      List<String> interceptorSettingStrList) {
+    StringBuilder stringBuilder = new StringBuilder();
+    int listSize = interceptorSettingStrList.size();
+    for (int i = 0; i < listSize; ++i) {
+      stringBuilder.append(interceptorSettingStrList.get(i));
+      if (i < listSize - 1) {
+        stringBuilder.append(RegexMountPoint.INTERCEPTOR_SEP);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  @Test
+  public void testConfLinkRegexIndexMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // (^/(\w+),/targetTestRoot/$1)
+    // => /targetTestRoot/testConfLinkRegexIndexMapping1
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/(\\w+)";
+    String dstPathStr = targetTestRoot + "$1";
+    Path srcPath = new Path("/testConfLinkRegexIndexMapping1");
+    Path expectedResolveResult =
+        new Path(dstPathStr.replace("$1", "testConfLinkRegexIndexMapping1"));
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    // Test ${1} format
+    // ^/(\w+, /targetTestRoot/${1})
+    // => /targetTestRoot/testConfLinkRegexIndexMapping2
+    dstPathStr = targetTestRoot + "${1}";
+    srcPath = new Path("/testConfLinkRegexIndexMapping2");
+    expectedResolveResult =
+        new Path(dstPathStr.replace("${1}", "testConfLinkRegexIndexMapping2"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping3/file1
+    dstPathStr = targetTestRoot + "$1";
+    srcPath = new Path("/testConfLinkRegexIndexMapping3/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping3/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1/)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping4/file1
+    dstPathStr = targetTestRoot + "$1/";
+    srcPath = new Path("/testConfLinkRegexIndexMapping4/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping4/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);

Review comment:
       you may want to close vfs as well?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkRegex.java
##########
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
+import static 
org.apache.hadoop.fs.viewfs.RegexMountPoint.INTERCEPTOR_INTERNAL_SEP;
+
+/**
+ * Test linkRegex node type for view file system.
+ */
+public class TestViewFileSystemLinkRegex extends ViewFileSystemBaseTest {
+  public static final Logger LOGGER =
+      LoggerFactory.getLogger(TestViewFileSystemLinkRegex.class);
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static Configuration clusterConfig;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final FileSystem[] FS_HDFS = new 
FileSystem[NAME_SPACES_COUNT];
+  private static final String CLUSTER_NAME =
+      "TestViewFileSystemLinkRegexCluster";
+  private static final File TEST_DIR = GenericTestUtils
+      .getTestDir(TestViewFileSystemLinkRegex.class.getSimpleName());
+  private static final String TEST_BASE_PATH =
+      "/tmp/TestViewFileSystemLinkRegex";
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_BASE_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException {
+    SupportsBlocks = true;
+    clusterConfig = ViewFileSystemTestSetup.createConfig();
+    clusterConfig.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(clusterConfig).nnTopology(
+        MiniDFSNNTopology.simpleFederatedTopology(NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT).build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under 
the
+   * root of the FS, and so that we don't try to delete the test dir, but 
rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  public String buildReplaceInterceptorSettingString(String srcRegex,
+      String replaceString) {
+    return
+        
RegexMountPointInterceptorType.REPLACE_RESOLVED_DST_PATH.getConfigName()
+            + INTERCEPTOR_INTERNAL_SEP + srcRegex + INTERCEPTOR_INTERNAL_SEP
+            + replaceString;
+  }
+
+  public String linkInterceptorSettings(
+      List<String> interceptorSettingStrList) {
+    StringBuilder stringBuilder = new StringBuilder();
+    int listSize = interceptorSettingStrList.size();
+    for (int i = 0; i < listSize; ++i) {
+      stringBuilder.append(interceptorSettingStrList.get(i));
+      if (i < listSize - 1) {
+        stringBuilder.append(RegexMountPoint.INTERCEPTOR_SEP);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  @Test
+  public void testConfLinkRegexIndexMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // (^/(\w+),/targetTestRoot/$1)
+    // => /targetTestRoot/testConfLinkRegexIndexMapping1
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/(\\w+)";
+    String dstPathStr = targetTestRoot + "$1";
+    Path srcPath = new Path("/testConfLinkRegexIndexMapping1");
+    Path expectedResolveResult =
+        new Path(dstPathStr.replace("$1", "testConfLinkRegexIndexMapping1"));
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    // Test ${1} format
+    // ^/(\w+, /targetTestRoot/${1})
+    // => /targetTestRoot/testConfLinkRegexIndexMapping2
+    dstPathStr = targetTestRoot + "${1}";
+    srcPath = new Path("/testConfLinkRegexIndexMapping2");
+    expectedResolveResult =
+        new Path(dstPathStr.replace("${1}", "testConfLinkRegexIndexMapping2"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping3/file1
+    dstPathStr = targetTestRoot + "$1";
+    srcPath = new Path("/testConfLinkRegexIndexMapping3/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping3/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1/)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping4/file1
+    dstPathStr = targetTestRoot + "$1/";
+    srcPath = new Path("/testConfLinkRegexIndexMapping4/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping4/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+  }
+
+  @Test
+  public void testConfLinkRegexNamedGroupMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // ^/(?<firstDir>\\w+) = > /targetTestRoot/$firstDir
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/(?<firstDir>\\w+)";
+    String dstPathStr = targetTestRoot + "$firstDir";
+    Path srcPath = new Path("/testConfLinkRegexNamedGroupMapping1");
+    Path expectedResolveResult = new Path(
+        dstPathStr.replace("$firstDir", 
"testConfLinkRegexNamedGroupMapping1"));
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    // Test ${1} format
+    dstPathStr = targetTestRoot + "${firstDir}";
+    srcPath = new Path("/testConfLinkRegexNamedGroupMapping2");
+    expectedResolveResult = new Path(dstPathStr
+        .replace("${firstDir}", "testConfLinkRegexNamedGroupMapping2"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);

Review comment:
       Could we assert the results of listStatus. Probably create few files 
under that expected target path. So, the ls will get some file and assert to 
make sure it's getting from target.

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkRegex.java
##########
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
+import static 
org.apache.hadoop.fs.viewfs.RegexMountPoint.INTERCEPTOR_INTERNAL_SEP;
+
+/**
+ * Test linkRegex node type for view file system.
+ */
+public class TestViewFileSystemLinkRegex extends ViewFileSystemBaseTest {
+  public static final Logger LOGGER =
+      LoggerFactory.getLogger(TestViewFileSystemLinkRegex.class);
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static Configuration clusterConfig;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final FileSystem[] FS_HDFS = new 
FileSystem[NAME_SPACES_COUNT];
+  private static final String CLUSTER_NAME =
+      "TestViewFileSystemLinkRegexCluster";
+  private static final File TEST_DIR = GenericTestUtils
+      .getTestDir(TestViewFileSystemLinkRegex.class.getSimpleName());
+  private static final String TEST_BASE_PATH =
+      "/tmp/TestViewFileSystemLinkRegex";
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_BASE_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException {
+    SupportsBlocks = true;
+    clusterConfig = ViewFileSystemTestSetup.createConfig();
+    clusterConfig.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(clusterConfig).nnTopology(
+        MiniDFSNNTopology.simpleFederatedTopology(NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT).build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under 
the
+   * root of the FS, and so that we don't try to delete the test dir, but 
rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  public String buildReplaceInterceptorSettingString(String srcRegex,
+      String replaceString) {
+    return
+        
RegexMountPointInterceptorType.REPLACE_RESOLVED_DST_PATH.getConfigName()
+            + INTERCEPTOR_INTERNAL_SEP + srcRegex + INTERCEPTOR_INTERNAL_SEP
+            + replaceString;
+  }
+
+  public String linkInterceptorSettings(
+      List<String> interceptorSettingStrList) {
+    StringBuilder stringBuilder = new StringBuilder();
+    int listSize = interceptorSettingStrList.size();
+    for (int i = 0; i < listSize; ++i) {
+      stringBuilder.append(interceptorSettingStrList.get(i));
+      if (i < listSize - 1) {
+        stringBuilder.append(RegexMountPoint.INTERCEPTOR_SEP);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  @Test
+  public void testConfLinkRegexIndexMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // (^/(\w+),/targetTestRoot/$1)
+    // => /targetTestRoot/testConfLinkRegexIndexMapping1
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/(\\w+)";
+    String dstPathStr = targetTestRoot + "$1";
+    Path srcPath = new Path("/testConfLinkRegexIndexMapping1");
+    Path expectedResolveResult =
+        new Path(dstPathStr.replace("$1", "testConfLinkRegexIndexMapping1"));
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    // Test ${1} format
+    // ^/(\w+, /targetTestRoot/${1})
+    // => /targetTestRoot/testConfLinkRegexIndexMapping2
+    dstPathStr = targetTestRoot + "${1}";
+    srcPath = new Path("/testConfLinkRegexIndexMapping2");
+    expectedResolveResult =
+        new Path(dstPathStr.replace("${1}", "testConfLinkRegexIndexMapping2"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping3/file1
+    dstPathStr = targetTestRoot + "$1";
+    srcPath = new Path("/testConfLinkRegexIndexMapping3/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping3/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    //(^/(\w+)/file1, /targetTestRoot/$1/)
+    // = > /targetTestRoot/testConfLinkRegexIndexMapping4/file1
+    dstPathStr = targetTestRoot + "$1/";
+    srcPath = new Path("/testConfLinkRegexIndexMapping4/file1");
+    expectedResolveResult = new Path(
+        dstPathStr.replace("$1", "testConfLinkRegexIndexMapping4/file1"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+  }
+
+  @Test
+  public void testConfLinkRegexNamedGroupMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // ^/(?<firstDir>\\w+) = > /targetTestRoot/$firstDir
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/(?<firstDir>\\w+)";
+    String dstPathStr = targetTestRoot + "$firstDir";
+    Path srcPath = new Path("/testConfLinkRegexNamedGroupMapping1");
+    Path expectedResolveResult = new Path(
+        dstPathStr.replace("$firstDir", 
"testConfLinkRegexNamedGroupMapping1"));
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+
+    // Test ${1} format
+    dstPathStr = targetTestRoot + "${firstDir}";
+    srcPath = new Path("/testConfLinkRegexNamedGroupMapping2");
+    expectedResolveResult = new Path(dstPathStr
+        .replace("${firstDir}", "testConfLinkRegexNamedGroupMapping2"));
+    outputStream = fsTarget.create(expectedResolveResult);
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+  }
+
+  @Test
+  public void testConfLinkRegexFixedDestMapping() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/\\w+";
+    String dstPathStr =
+        targetTestRoot + "testConfLinkRegexFixedDestMappingFile";
+    Path expectedResolveResult = new Path(dstPathStr);
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil.addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, null);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(
+        expectedResolveResult.equals(vfs.resolvePath(new Path("/misc1"))));
+    Assert.assertTrue(
+        expectedResolveResult.equals(vfs.resolvePath(new Path("/misc2"))));
+  }
+
+  @Test
+  public void testConfLinkRegexWithSingleInterceptor() throws Exception {
+    conf.setBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    URI viewFsUri =
+        new URI(FsConstants.VIEWFS_SCHEME, CLUSTER_NAME, "/", null, null);
+    String regexStr = "^/user/(?<username>\\w+)";
+    String dstPathStr = targetTestRoot + "$username";
+    // Replace "_" with "-"
+    String settingString = buildReplaceInterceptorSettingString("_", "-");
+    Path srcPath = new Path("/user/hadoop_user1/hadoop_file1");
+    Path expectedResolveResult =
+        new Path(targetTestRoot, "hadoop-user1/hadoop_file1");
+    FSDataOutputStream outputStream = fsTarget.create((expectedResolveResult));
+    fsTarget.listStatus(expectedResolveResult);
+    outputStream.close();
+    ConfigUtil
+        .addLinkRegex(conf, CLUSTER_NAME, regexStr, dstPathStr, settingString);
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    Assert.assertTrue(expectedResolveResult.equals(vfs.resolvePath(srcPath)));
+    Assert.assertEquals(0L, vfs.getFileStatus(srcPath).getLen());
+  }
+

Review comment:
       Do you mind writing some description test steps in javadoc. It will be 
helpful to better understand about scenarios.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to