OOZIE-2601 Ability to use local paths for the sharelib
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/25a8b99d Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/25a8b99d Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/25a8b99d Branch: refs/heads/master Commit: 25a8b99d5197c4e18acf0fd332c4396450d3d551 Parents: 18694b6 Author: Attila Sasvari <asasv...@cloudera.com> Authored: Thu Jul 20 13:20:47 2017 +0200 Committer: Attila Sasvari <asasv...@apache.org> Committed: Thu Jul 20 17:18:20 2017 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 109 ++- .../apache/oozie/service/ShareLibService.java | 75 +- .../org/apache/oozie/util/ClasspathUtils.java | 27 + .../java/org/apache/oozie/util/FSUtils.java | 14 + core/src/main/resources/oozie-default.xml | 3 +- .../service/TestShareLibMappingFileInput.java | 88 ++ .../oozie/service/TestShareLibService.java | 873 ++++++++++--------- docs/src/site/twiki/AG_Install.twiki | 30 +- release-log.txt | 1 + 9 files changed, 724 insertions(+), 496 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/25a8b99d/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 89bac95..23e1f69 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -18,44 +18,24 @@ package org.apache.oozie.action.hadoop; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.StringReader; -import java.net.ConnectException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; - -import org.apache.commons.io.IOUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Closeables; import com.google.common.primitives.Ints; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -95,6 +75,7 @@ import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.ClasspathUtils; import org.apache.oozie.util.ELEvaluationException; import org.apache.oozie.util.ELEvaluator; +import org.apache.oozie.util.FSUtils; import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.PropertiesUtils; @@ -105,8 +86,28 @@ import org.jdom.Element; import org.jdom.JDOMException; import org.jdom.Namespace; -import com.google.common.collect.ImmutableList; -import com.google.common.io.Closeables; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; public class JavaActionExecutor extends ActionExecutor { @@ -463,8 +464,11 @@ public class JavaActionExecutor extends ActionExecutor { else if (fileName.endsWith(".jar")) { // .jar files if (!fileName.contains("#")) { String user = conf.get("user.name"); - Path pathToAdd = new Path(uri.normalize()); - Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, pathToAdd, conf); + + if (FSUtils.isNotLocalFile(fileName)) { + Path pathToAdd = new Path(uri.normalize()); + Services.get().get(HadoopAccessorService.class).addFileToClassPath(user, pathToAdd, conf); + } } else { DistributedCache.addCacheFile(uri.normalize(), conf); @@ -585,9 +589,7 @@ public class JavaActionExecutor extends ActionExecutor { } } } - for (Path libPath : sharelibList) { - addToCache(conf, libPath, libPath.toUri().getPath(), false); - } + addLibPathsToCache(conf, sharelibList); } catch (URISyntaxException ex) { throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "Error configuring sharelib", @@ -610,18 +612,8 @@ public class JavaActionExecutor extends ActionExecutor { throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001", "Could not locate Oozie sharelib"); } - FileSystem fs = listOfPaths.get(0).getFileSystem(conf); - for (Path actionLibPath : listOfPaths) { - JobUtils.addFileToClassPath(actionLibPath, conf, fs); - DistributedCache.createSymlink(conf); - } - listOfPaths = shareLibService.getSystemLibJars(getType()); - if (!listOfPaths.isEmpty()) { - for (Path actionLibPath : listOfPaths) { - JobUtils.addFileToClassPath(actionLibPath, conf, fs); - DistributedCache.createSymlink(conf); - } - } + addLibPathsToClassPath(conf, listOfPaths); + addLibPathsToClassPath(conf, shareLibService.getSystemLibJars(getType())); } catch (IOException ex) { throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", @@ -630,6 +622,37 @@ public class JavaActionExecutor extends ActionExecutor { } } + private void addLibPathsToClassPath(Configuration conf, List<Path> listOfPaths) + throws IOException, ActionExecutorException { + addLibPathsToClassPathOrCache(conf, listOfPaths, false); + } + + private void addLibPathsToCache(Configuration conf, Set<Path> sharelibList) + throws IOException, ActionExecutorException { + addLibPathsToClassPathOrCache(conf, sharelibList, true); + } + + + private void addLibPathsToClassPathOrCache(Configuration conf, Collection<Path> sharelibList, boolean isAddToCache) + throws IOException, ActionExecutorException { + + for (Path libPath : sharelibList) { + if (FSUtils.isLocalFile(libPath.toString())) { + conf = ClasspathUtils.addToClasspathFromLocalShareLib(conf, libPath); + } + else { + if (isAddToCache) { + addToCache(conf, libPath, libPath.toUri().getPath(), false); + } + else { + FileSystem fs = libPath.getFileSystem(conf); + JobUtils.addFileToClassPath(libPath, conf, fs); + DistributedCache.createSymlink(conf); + } + } + } + } + protected void addActionLibs(Path appPath, Configuration conf) throws ActionExecutorException { String[] actionLibsStrArr = conf.getStrings("oozie.launcher.oozie.libpath"); if (actionLibsStrArr != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/25a8b99d/core/src/main/java/org/apache/oozie/service/ShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java b/core/src/main/java/org/apache/oozie/service/ShareLibService.java index 82a0aff..8986a49 100644 --- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java +++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java @@ -45,6 +45,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; @@ -60,6 +61,8 @@ import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; import org.jdom.JDOMException; +import static org.apache.oozie.util.FSUtils.isLocalFile; + public class ShareLibService implements Service, Instrumentable { public static final String LAUNCHERJAR_LIB_RETENTION = CONF_PREFIX + "ShareLibService.temp.sharelib.retention.days"; @@ -108,6 +111,7 @@ public class ShareLibService implements Service, Instrumentable { private String sharelibDirOld; FileSystem fs; + FileSystem localFs; final long retentionTime = 1000 * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION); @@ -121,7 +125,9 @@ public class ShareLibService implements Service, Instrumentable { HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); URI uri = launcherlibPath.toUri(); try { + fs = FileSystem.get(has.createConfiguration(uri.getAuthority())); + localFs = LocalFileSystem.get(new Configuration(false)); //cache action key sharelib conf list cacheActionKeySharelibConfList(); updateLauncherLib(); @@ -350,24 +356,26 @@ public class ShareLibService implements Service, Instrumentable { return shareLibMap.get(shareLibKey); } - private void checkSymlink(String shareLibKey) throws IOException { + private void checkSymlink(final String shareLibKey) throws IOException { if (symlinkMapping.get(shareLibKey) == null || symlinkMapping.get(shareLibKey).isEmpty()) { return; } - for (Path path : symlinkMapping.get(shareLibKey).keySet()) { - if (!symlinkMapping.get(shareLibKey).get(path).equals(FSUtils.getSymLinkTarget(fs, path))) { + for (final Path symlinkPath : symlinkMapping.get(shareLibKey).keySet()) { + final FileSystem fileSystem = getHostFileSystem(symlinkPath); + final Path symLinkTarget = FSUtils.getSymLinkTarget(fileSystem, symlinkPath); + final boolean symlinkIsNotTarget = !getSymlinkSharelibPath(shareLibKey, symlinkPath).equals(symLinkTarget); + if (symlinkIsNotTarget) { synchronized (ShareLibService.class) { - Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap); + final Map<String, List<Path>> tmpShareLibMap = new HashMap<String, List<Path>>(shareLibMap); - Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>( - shareLibConfigMap); + final Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<>(shareLibConfigMap); - Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>( + final Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>( symlinkMapping); LOG.info(MessageFormat.format("Symlink target for [{0}] has changed, was [{1}], now [{2}]", - shareLibKey, path, FSUtils.getSymLinkTarget(fs, path))); + shareLibKey, symlinkPath, symLinkTarget)); loadShareLibMetaFile(tmpShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, shareLibKey); shareLibMap = tmpShareLibMap; @@ -375,10 +383,27 @@ public class ShareLibService implements Service, Instrumentable { shareLibConfigMap = tmpShareLibConfigMap; return; } - } } + } + private Path getSymlinkSharelibPath(String shareLibKey, Path path) { + return symlinkMapping.get(shareLibKey).get(path); + } + + private FileSystem getHostFileSystem(String pathStr) { + FileSystem fileSystem; + if (isLocalFile(pathStr)) { + fileSystem = localFs; + } + else { + fileSystem = fs; + } + return fileSystem; + } + + private FileSystem getHostFileSystem(Path path) { + return getHostFileSystem(path.toString()); } /** @@ -534,9 +559,13 @@ public class ShareLibService implements Service, Instrumentable { Map<String, Map<Path, Path>> tmpSymlinkMapping = new HashMap<String, Map<Path, Path>>(); Map<String, Map<Path, Configuration>> tmpShareLibConfigMap = new HashMap<String, Map<Path, Configuration>>(); - if (!StringUtils.isEmpty(sharelibMappingFile.trim())) { + String trimmedSharelibMappingFile = sharelibMappingFile.trim(); + if (!StringUtils.isEmpty(trimmedSharelibMappingFile)) { + FileSystem fileSystem = getHostFileSystem(trimmedSharelibMappingFile); + String sharelibMetaFileNewTimeStamp = JsonUtils.formatDateRfc822( - new Date(fs.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT"); + new Date(fileSystem.getFileStatus(new Path(sharelibMappingFile)).getModificationTime()), "GMT"); + loadShareLibMetaFile(tempShareLibMap, tmpSymlinkMapping, tmpShareLibConfigMap, sharelibMappingFile, null); status.put("sharelibMetaFile", sharelibMappingFile); status.put("sharelibMetaFileNewTimeStamp", sharelibMetaFileNewTimeStamp); @@ -612,8 +641,8 @@ public class ShareLibService implements Service, Instrumentable { throws IOException { Path shareFileMappingPath = new Path(sharelibFileMapping); - HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); - FileSystem filesystem = FileSystem.get(has.createConfiguration(shareFileMappingPath.toUri().getAuthority())); + FileSystem filesystem = getHostFileSystem(shareFileMappingPath); + Properties prop = new Properties(); prop.load(filesystem.open(new Path(sharelibFileMapping))); @@ -634,11 +663,13 @@ public class ShareLibService implements Service, Instrumentable { List<Path> listOfPaths = new ArrayList<Path>(); Map<Path, Path> symlinkMappingforAction = new HashMap<Path, Path>(); - for (String dfsPath : pathList) { - Path path = new Path(dfsPath); - getPathRecursively(fs, new Path(dfsPath), listOfPaths, shareLibKey, shareLibConfigMap); - if (FSUtils.isSymlink(fs, path)) { - symlinkMappingforAction.put(path, FSUtils.getSymLinkTarget(fs, path)); + for (String pathStr : pathList) { + Path path = new Path(pathStr); + final FileSystem fileSystem = getHostFileSystem(pathStr); + + getPathRecursively(fileSystem, path, listOfPaths, shareLibKey, shareLibConfigMap); + if (FSUtils.isSymlink(fileSystem, path)) { + symlinkMappingforAction.put(path, FSUtils.getSymLinkTarget(fileSystem, path)); } } @@ -825,21 +856,21 @@ public class ShareLibService implements Service, Instrumentable { /** * Cache XML conf file * - * @param hdfsPath the hdfs path + * @param propertyFilePath the path of the property file * @param shareLibKey the share lib key * @throws IOException Signals that an I/O exception has occurred. * @throws JDOMException */ - private void cachePropertyFile(Path qualifiedHdfsPath, Path hdfsPath, String shareLibKey, + private void cachePropertyFile(Path qualifiedHdfsPath, Path propertyFilePath, String shareLibKey, Map<String, Map<Path, Configuration>> shareLibConfigMap) throws IOException, JDOMException { Map<Path, Configuration> confMap = shareLibConfigMap.get(shareLibKey); if (confMap == null) { confMap = new HashMap<Path, Configuration>(); shareLibConfigMap.put(shareLibKey, confMap); } - Configuration xmlConf = new XConfiguration(fs.open(hdfsPath)); + FileSystem fileSystem = getHostFileSystem(propertyFilePath); + Configuration xmlConf = new XConfiguration(fileSystem.open(propertyFilePath)); confMap.put(qualifiedHdfsPath, xmlConf); - } private void cacheActionKeySharelibConfList() { http://git-wip-us.apache.org/repos/asf/oozie/blob/25a8b99d/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java index 5833607..bfe2f3e 100644 --- a/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java +++ b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import java.io.File; import java.io.IOException; import java.net.URI; import java.util.Arrays; @@ -37,6 +38,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; + public class ClasspathUtils { private static boolean usingMiniYarnCluster = false; private static final List<String> CLASSPATH_ENTRIES = Arrays.asList( @@ -139,4 +141,29 @@ public class ClasspathUtils { } } } + + public static Configuration addToClasspathFromLocalShareLib(Configuration conf, Path libPath) { + if (conf == null) { + conf = new Configuration(false); + } + final String pathStr = normalizedLocalFsPath(libPath); + + String appClassPath = conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH); + + if (org.apache.commons.lang.StringUtils.isEmpty(appClassPath)) { + addPathToYarnClasspathInConfig(conf, pathStr, StringUtils.join(File.pathSeparator, + YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)); + } else { + addPathToYarnClasspathInConfig(conf, pathStr, appClassPath); + } + return conf; + } + + private static void addPathToYarnClasspathInConfig(Configuration conf, String pathStr, String appClassPath) { + conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, appClassPath + File.pathSeparator + pathStr); + } + + private static String normalizedLocalFsPath(Path libPath) { + return org.apache.commons.lang.StringUtils.replace(libPath.toString(), FSUtils.FILE_SCHEME_PREFIX, ""); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/25a8b99d/core/src/main/java/org/apache/oozie/util/FSUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/FSUtils.java b/core/src/main/java/org/apache/oozie/util/FSUtils.java index 6d73fc7..c25d931 100644 --- a/core/src/main/java/org/apache/oozie/util/FSUtils.java +++ b/core/src/main/java/org/apache/oozie/util/FSUtils.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.net.URI; public final class FSUtils { + public static final String FILE_SCHEME_PREFIX = "file:"; + public static Path getSymLinkTarget(FileSystem fs, Path p) throws IOException { try { //getSymlink doesn't work with fragment name, need to remove fragment before calling getSymlink @@ -50,4 +52,16 @@ public final class FSUtils { public static void createSymlink(FileSystem fs, Path target, Path link, boolean createParent) throws IOException { fs.createSymlink(target, link, createParent); } + + public static boolean isLocalFile(String fileName) { + return fileName.startsWith(FILE_SCHEME_PREFIX + "/"); + } + + public static boolean isLocalFile(Path filePath) { + return isLocalFile(filePath.toString()); + } + + public static boolean isNotLocalFile(String fileName) { + return !isLocalFile(fileName); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/25a8b99d/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 832bbe1..b59a2e6 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2694,11 +2694,12 @@ will be the requeue interval for the actions which are waiting for a long time w <description> Sharelib mapping files contains list of key=value, where key will be the sharelib name for the action and value is a comma separated list of - DFS directories or jar files. + DFS or local filesystem directories or jar files. Example. oozie.pig_10=hdfs:///share/lib/pig/pig-0.10.1/lib/ oozie.pig=hdfs:///share/lib/pig/pig-0.11.1/lib/ oozie.distcp=hdfs:///share/lib/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-distcp-2.2.0.jar + oozie.hive=file:///usr/local/oozie/share/lib/hive/ </description> </property> http://git-wip-us.apache.org/repos/asf/oozie/blob/25a8b99d/core/src/test/java/org/apache/oozie/service/TestShareLibMappingFileInput.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestShareLibMappingFileInput.java b/core/src/test/java/org/apache/oozie/service/TestShareLibMappingFileInput.java new file mode 100644 index 0000000..92fec9b --- /dev/null +++ b/core/src/test/java/org/apache/oozie/service/TestShareLibMappingFileInput.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.service; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.util.FSUtils; + +import java.io.IOException; + +public class TestShareLibMappingFileInput { + final FileSystem fs; + final String sharelibNameWithMappingFilePrefix; + final String sharelibName; + final String sharelibPath; + final String baseName; + + public TestShareLibMappingFileInput(final FileSystem fs, + final String sharelibMappingActionName, + final String sharelibPath) { + this.fs = fs; + this.sharelibName = sharelibMappingActionName; + this.sharelibNameWithMappingFilePrefix = ShareLibService.SHARE_LIB_CONF_PREFIX + "." + + sharelibMappingActionName; + this.sharelibPath = sharelibPath; + this.baseName = sharelibPath.substring(sharelibPath.lastIndexOf(Path.SEPARATOR) + 1); + } + + public void materialize() throws IOException { + TestShareLibService.createFileWithDirectoryPath(fs, sharelibPath); + } + + String getFullShareLibPathDir() { + String fullShareLibPathDir = getLocalizedShareLibPath(fs, sharelibPath); + return fullShareLibPathDir.substring(0, fullShareLibPathDir.lastIndexOf(Path.SEPARATOR)); + } + + static String getLocalizedShareLibPath(FileSystem fs, String path) { + return (fs instanceof LocalFileSystem) + ? FSUtils.FILE_SCHEME_PREFIX + "//" + path + : path; + } +} + +class TestShareLibMappingSymlinkInput extends TestShareLibMappingFileInput { + final String symlinkPath; + + public TestShareLibMappingSymlinkInput(final FileSystem fs, + final String sharelibActionName, + final String sharelibPath, + final String symlinkPath) { + super(fs, sharelibActionName, sharelibPath); + this.symlinkPath = symlinkPath; + } + + @Override + public void materialize() throws IOException { + super.materialize(); + FSUtils.createSymlink(fs, new Path(sharelibPath), new Path(symlinkPath), true); + } + + @Override + String getFullShareLibPathDir() { + int lastIndexOfPathSeparator = sharelibPath.lastIndexOf(Path.SEPARATOR); + String symlinkTargetFileName = (lastIndexOfPathSeparator != -1) + ? sharelibPath.substring(lastIndexOfPathSeparator + 1) + : sharelibPath; + String fullShareLibPathDir = getLocalizedShareLibPath(fs, symlinkPath); + return fullShareLibPathDir + "#" + symlinkTargetFileName; + } +}