Repository: samza Updated Branches: refs/heads/master b5e45b7ec -> c1b778a01
SAMZA-1143; Include fs.<scheme>.impl.* subkeys to YarnConfiguration used in YarnJobFactory and YarnClusterResourceManager SAMZA-1143 Include fs.<scheme>.impl.* subkeys, in addition tofs.<scheme>.impl, to YarnConfiguration used in YarnJobFactory and YarnClusterResourceManager. When there are additional subconfigurations under fs.myScheme.impl, such as fs.myScheme.impl.client, we need to keep the set of configuration completed in YarnJobFactory and YarnClusterResourceManager. When the context is set for localizing the resource in ClientHelper and YarnContainerRunner, it may rely on this configuration to get the FileStatus information, which may or may not depends on fs.<scheme>.impl and all possible fs.<scheme>.impl.* sub-configuration. This is an enhanced feature to the PR#90. Author: Fred Ji <f...@linkedin.com> Author: vjagadish1989 <jvenk...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #97 from fredji97/fsImplSubkeys Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c1b778a0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c1b778a0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c1b778a0 Branch: refs/heads/master Commit: c1b778a01277d41e538f3d467d81cd8e69e033a5 Parents: b5e45b7 Author: Fred Ji <f...@linkedin.com> Authored: Wed Apr 5 22:28:38 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Wed Apr 5 22:28:38 2017 -0700 ---------------------------------------------------------------------- .../samza/job/yarn/FileSystemImplConfig.java | 31 ++++++------------ .../job/yarn/YarnClusterResourceManager.java | 8 +++-- .../apache/samza/job/yarn/YarnJobFactory.scala | 8 +++-- .../job/yarn/TestFileSystemImplConfig.java | 33 +++++++++++++------- .../samza/job/yarn/TestYarnJobFactory.java | 11 +++++++ 5 files changed, 54 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java index 8e79104..7e10f4f 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java @@ -20,6 +20,7 @@ package org.apache.samza.job.yarn; import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.samza.config.Config; import org.slf4j.Logger; @@ -31,10 +32,9 @@ import org.slf4j.LoggerFactory; * e.g. fs.http.impl */ public class FileSystemImplConfig { - private static final Logger log = LoggerFactory.getLogger(FileSystemImplConfig.class); private static final String FS_IMPL_PREFIX = "fs."; private static final String FS_IMPL_SUFFIX = ".impl"; - private static final String FS_IMPL = "fs.%s.impl"; + private static final String FS_IMPL_TEMPLATE = "fs.%s.impl"; private final Config config; @@ -61,26 +61,15 @@ public class FileSystemImplConfig { } /** - * Get the fs.<scheme>impl as the config key from scheme + * Get the config subset for fs.<scheme>.impl + * It can include config for fs.<scheme>.impl and additional config for the subKeys fs.<scheme>.impl.* from the configuration + * e.g. for scheme "myScheme", there could be config for fs.myScheme.impl, fs.myScheme.impl.client and fs.myScheme.impl.server * @param scheme scheme name, such as http, hdfs, myscheme - * @return fs.<scheme>impl + * @return config for the particular scheme */ - public String getFsImplKey(final String scheme) { - String fsImplKey = String.format(FS_IMPL, scheme); - return fsImplKey; - } - - /** - * Get the class name corresponding for the given scheme - * @param scheme scheme name, such as http, hdfs, myscheme - * @return full scoped class name for the file system for <scheme> - */ - public String getFsImplClassName(final String scheme) { - String fsImplKey = getFsImplKey(scheme); - String fsImplClassName = config.get(fsImplKey); - if (StringUtils.isEmpty(fsImplClassName)) { - throw new LocalizerResourceException(fsImplKey + " does not have configured class implementation"); - } - return fsImplClassName; + public Config getSchemeConfig(final String scheme) { + String fsSchemeImpl = String.format(FS_IMPL_TEMPLATE, scheme); + Config schemeConfig = config.subset(fsSchemeImpl, false); // do not strip off the prefix + return schemeConfig; } } http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 1fd3939..ae171c7 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -119,10 +119,14 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement hConfig = new YarnConfiguration(); hConfig.set("fs.http.impl", HttpFileSystem.class.getName()); - // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration + // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" for YarnConfiguration FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config); fsImplConfig.getSchemes().forEach( - scheme -> hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme)) + scheme -> { + fsImplConfig.getSchemeConfig(scheme).forEach( + (confKey, confValue) -> hConfig.set(confKey, confValue) + ); + } ); MetricsRegistryMap registry = new MetricsRegistryMap(); http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala index 2d8a3f1..9433cda 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala @@ -40,10 +40,14 @@ class YarnJobFactory extends StreamJobFactory with Logging { hConfig.set(YarnConfiguration.RM_ADDRESS, config.get(YarnConfiguration.RM_ADDRESS, "0.0.0.0:8032")) } - // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration + // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" for YarnConfiguration val fsImplConfig = new FileSystemImplConfig(config) fsImplConfig.getSchemes.asScala.foreach( - (scheme : String) => hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme)) + scheme => { + fsImplConfig.getSchemeConfig(scheme).asScala.foreach { + case(confKey, confValue) => hConfig.set(confKey, confValue) + } + } ) new YarnJob(config, hConfig) http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java index 6e11c66..63d83df 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java @@ -18,6 +18,8 @@ */ package org.apache.samza.job.yarn; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.apache.samza.config.Config; @@ -45,12 +47,6 @@ public class TestFileSystemImplConfig { assertEquals(2, manager.getSchemes().size()); assertEquals("http", manager.getSchemes().get(0)); assertEquals("myscheme", manager.getSchemes().get(1)); - - assertEquals("fs.http.impl", manager.getFsImplKey("http")); - assertEquals("fs.myscheme.impl", manager.getFsImplKey("myscheme")); - - assertEquals("org.apache.samza.HttpFileSystem", manager.getFsImplClassName("http")); - assertEquals("org.apache.samza.MySchemeFileSystem", manager.getFsImplClassName("myscheme")); } @Test @@ -61,15 +57,28 @@ public class TestFileSystemImplConfig { } @Test - public void testEmptyImpl() { - thrown.expect(LocalizerResourceException.class); - thrown.expectMessage("fs.http.impl does not have configured class implementation"); - + public void testSchemeWithSubkeys() { Map<String, String> configMap = new HashMap<>(); - configMap.put("fs.http.impl", ""); + configMap.put("fs.http.impl", "org.apache.samza.HttpFileSystem"); + configMap.put("fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem"); + configMap.put("fs.http.impl.key1", "val1"); + configMap.put("fs.http.impl.key2", "val2"); Config conf = new MapConfig(configMap); FileSystemImplConfig manager = new FileSystemImplConfig(conf); - manager.getFsImplClassName("http"); + + Map<String, String> expectedFsHttpImplConfs = ImmutableMap.of( //Scheme with additional subkeys + "fs.http.impl", "org.apache.samza.HttpFileSystem", + "fs.http.impl.key1", "val1", + "fs.http.impl.key2", "val2" + ); + + Map<String, String> expectedFsMyschemeImplConfs = ImmutableMap.of( // Scheme without subkeys + "fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem" + ); + + assertEquals(Arrays.asList("http", "myscheme"), manager.getSchemes()); + assertEquals(expectedFsHttpImplConfs, manager.getSchemeConfig("http")); + assertEquals(expectedFsMyschemeImplConfs, manager.getSchemeConfig("myscheme")); } } http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java index 11077f0..12d45f5 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java @@ -45,4 +45,15 @@ public class TestYarnJobFactory { assertEquals("org.apache.myHttp", hConfig.get("fs.http.impl")); assertEquals("org.apache.myScheme", hConfig.get("fs.myscheme.impl")); } + + @Test + public void testGetJobWithFsImplSubkeys() { + YarnJobFactory jobFactory = new YarnJobFactory(); + YarnJob yarnJob = jobFactory.getJob(new MapConfig(ImmutableMap.of( + "fs.myscheme.impl","org.apache.myScheme", + "fs.myscheme.impl.client","org.apache.mySchemeClient"))); + Configuration hConfig = yarnJob.client().yarnClient().getConfig(); + assertEquals("org.apache.myScheme", hConfig.get("fs.myscheme.impl")); + assertEquals("org.apache.mySchemeClient", hConfig.get("fs.myscheme.impl.client")); + } }