Repository: samza
Updated Branches:
  refs/heads/master b5e45b7ec -> c1b778a01


SAMZA-1143; Include fs.<scheme>.impl.* subkeys to YarnConfiguration used in 
YarnJobFactory and YarnClusterResourceManager

SAMZA-1143 Include fs.&lt;scheme&gt;.impl.* subkeys, in addition 
tofs.&lt;scheme&gt;.impl, to YarnConfiguration used in YarnJobFactory and 
YarnClusterResourceManager.

When there are additional subconfigurations under fs.myScheme.impl, such as 
fs.myScheme.impl.client, we need to keep the set of configuration completed in  
YarnJobFactory and YarnClusterResourceManager. When the context is set for 
localizing the resource in ClientHelper and YarnContainerRunner, it may rely on 
this configuration to get the FileStatus information, which may or may not 
depends on fs.&lt;scheme&gt;.impl and all possible fs.&lt;scheme&gt;.impl.* 
sub-configuration.

This is an enhanced feature to the PR#90.

Author: Fred Ji <f...@linkedin.com>
Author: vjagadish1989 <jvenk...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #97 from fredji97/fsImplSubkeys


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c1b778a0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c1b778a0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c1b778a0

Branch: refs/heads/master
Commit: c1b778a01277d41e538f3d467d81cd8e69e033a5
Parents: b5e45b7
Author: Fred Ji <f...@linkedin.com>
Authored: Wed Apr 5 22:28:38 2017 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Wed Apr 5 22:28:38 2017 -0700

----------------------------------------------------------------------
 .../samza/job/yarn/FileSystemImplConfig.java    | 31 ++++++------------
 .../job/yarn/YarnClusterResourceManager.java    |  8 +++--
 .../apache/samza/job/yarn/YarnJobFactory.scala  |  8 +++--
 .../job/yarn/TestFileSystemImplConfig.java      | 33 +++++++++++++-------
 .../samza/job/yarn/TestYarnJobFactory.java      | 11 +++++++
 5 files changed, 54 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
index 8e79104..7e10f4f 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java
@@ -20,6 +20,7 @@ package org.apache.samza.job.yarn;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import org.apache.commons.lang.StringUtils;
 import org.apache.samza.config.Config;
 import org.slf4j.Logger;
@@ -31,10 +32,9 @@ import org.slf4j.LoggerFactory;
  * e.g. fs.http.impl
  */
 public class FileSystemImplConfig {
-  private static final Logger log = 
LoggerFactory.getLogger(FileSystemImplConfig.class);
   private static final String FS_IMPL_PREFIX = "fs.";
   private static final String FS_IMPL_SUFFIX = ".impl";
-  private static final String FS_IMPL = "fs.%s.impl";
+  private static final String FS_IMPL_TEMPLATE = "fs.%s.impl";
 
   private final Config config;
 
@@ -61,26 +61,15 @@ public class FileSystemImplConfig {
   }
 
   /**
-   * Get the fs.&lt;scheme&gt;impl as the config key from scheme
+   * Get the config subset for fs.&lt;scheme&gt;.impl
+   * It can include config for fs.&lt;scheme&gt;.impl and additional config 
for the subKeys fs.&lt;scheme&gt;.impl.* from the configuration
+   * e.g. for scheme "myScheme", there could be config for fs.myScheme.impl, 
fs.myScheme.impl.client and fs.myScheme.impl.server
    * @param scheme scheme name, such as http, hdfs, myscheme
-   * @return fs.&lt;scheme&gt;impl
+   * @return config for the particular scheme
    */
-  public String getFsImplKey(final String scheme) {
-    String fsImplKey = String.format(FS_IMPL, scheme);
-    return fsImplKey;
-  }
-
-  /**
-   * Get the class name corresponding for the given scheme
-   * @param scheme scheme name, such as http, hdfs, myscheme
-   * @return full scoped class name for the file system for &lt;scheme&gt;
-   */
-  public String getFsImplClassName(final String scheme) {
-    String fsImplKey = getFsImplKey(scheme);
-    String fsImplClassName = config.get(fsImplKey);
-    if (StringUtils.isEmpty(fsImplClassName)) {
-      throw new LocalizerResourceException(fsImplKey + " does not have 
configured class implementation");
-    }
-    return fsImplClassName;
+  public Config getSchemeConfig(final String scheme) {
+    String fsSchemeImpl = String.format(FS_IMPL_TEMPLATE, scheme);
+    Config schemeConfig = config.subset(fsSchemeImpl, false); // do not strip 
off the prefix
+    return schemeConfig;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 1fd3939..ae171c7 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -119,10 +119,14 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     hConfig = new YarnConfiguration();
     hConfig.set("fs.http.impl", HttpFileSystem.class.getName());
 
-    // Use the Samza job config "fs.<scheme>.impl" to override 
YarnConfiguration
+    // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" 
for YarnConfiguration
     FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config);
     fsImplConfig.getSchemes().forEach(
-        scheme -> hConfig.set(fsImplConfig.getFsImplKey(scheme), 
fsImplConfig.getFsImplClassName(scheme))
+        scheme -> {
+          fsImplConfig.getSchemeConfig(scheme).forEach(
+              (confKey, confValue) -> hConfig.set(confKey, confValue)
+          );
+        }
     );
 
     MetricsRegistryMap registry = new MetricsRegistryMap();

http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
index 2d8a3f1..9433cda 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
@@ -40,10 +40,14 @@ class YarnJobFactory extends StreamJobFactory with Logging {
       hConfig.set(YarnConfiguration.RM_ADDRESS, 
config.get(YarnConfiguration.RM_ADDRESS, "0.0.0.0:8032"))
     }
 
-    // Use the Samza job config "fs.<scheme>.impl" to override 
YarnConfiguration
+    // Use the Samza job config "fs.<scheme>.impl" and "fs.<scheme>.impl.*" 
for YarnConfiguration
     val fsImplConfig = new FileSystemImplConfig(config)
     fsImplConfig.getSchemes.asScala.foreach(
-      (scheme : String) => hConfig.set(fsImplConfig.getFsImplKey(scheme), 
fsImplConfig.getFsImplClassName(scheme))
+      scheme => {
+        fsImplConfig.getSchemeConfig(scheme).asScala.foreach {
+          case(confKey, confValue) => hConfig.set(confKey, confValue)
+        }
+      }
     )
 
     new YarnJob(config, hConfig)

http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
index 6e11c66..63d83df 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.job.yarn;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
@@ -45,12 +47,6 @@ public class TestFileSystemImplConfig {
     assertEquals(2, manager.getSchemes().size());
     assertEquals("http", manager.getSchemes().get(0));
     assertEquals("myscheme", manager.getSchemes().get(1));
-
-    assertEquals("fs.http.impl", manager.getFsImplKey("http"));
-    assertEquals("fs.myscheme.impl", manager.getFsImplKey("myscheme"));
-
-    assertEquals("org.apache.samza.HttpFileSystem", 
manager.getFsImplClassName("http"));
-    assertEquals("org.apache.samza.MySchemeFileSystem", 
manager.getFsImplClassName("myscheme"));
   }
 
   @Test
@@ -61,15 +57,28 @@ public class TestFileSystemImplConfig {
   }
 
   @Test
-  public void testEmptyImpl() {
-    thrown.expect(LocalizerResourceException.class);
-    thrown.expectMessage("fs.http.impl does not have configured class 
implementation");
-
+  public void testSchemeWithSubkeys() {
     Map<String, String> configMap = new HashMap<>();
-    configMap.put("fs.http.impl", "");
+    configMap.put("fs.http.impl", "org.apache.samza.HttpFileSystem");
+    configMap.put("fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem");
+    configMap.put("fs.http.impl.key1", "val1");
+    configMap.put("fs.http.impl.key2", "val2");
     Config conf = new MapConfig(configMap);
 
     FileSystemImplConfig manager = new FileSystemImplConfig(conf);
-    manager.getFsImplClassName("http");
+
+    Map<String, String> expectedFsHttpImplConfs = ImmutableMap.of( //Scheme 
with additional subkeys
+        "fs.http.impl", "org.apache.samza.HttpFileSystem",
+        "fs.http.impl.key1", "val1",
+        "fs.http.impl.key2", "val2"
+    );
+
+    Map<String, String> expectedFsMyschemeImplConfs = ImmutableMap.of( // 
Scheme without subkeys
+        "fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem"
+    );
+
+    assertEquals(Arrays.asList("http", "myscheme"), manager.getSchemes());
+    assertEquals(expectedFsHttpImplConfs, manager.getSchemeConfig("http"));
+    assertEquals(expectedFsMyschemeImplConfs, 
manager.getSchemeConfig("myscheme"));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1b778a0/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
index 11077f0..12d45f5 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java
@@ -45,4 +45,15 @@ public class TestYarnJobFactory {
     assertEquals("org.apache.myHttp", hConfig.get("fs.http.impl"));
     assertEquals("org.apache.myScheme", hConfig.get("fs.myscheme.impl"));
   }
+
+  @Test
+  public void  testGetJobWithFsImplSubkeys() {
+    YarnJobFactory jobFactory = new YarnJobFactory();
+    YarnJob yarnJob = jobFactory.getJob(new MapConfig(ImmutableMap.of(
+        "fs.myscheme.impl","org.apache.myScheme",
+        "fs.myscheme.impl.client","org.apache.mySchemeClient")));
+    Configuration hConfig = yarnJob.client().yarnClient().getConfig();
+    assertEquals("org.apache.myScheme", hConfig.get("fs.myscheme.impl"));
+    assertEquals("org.apache.mySchemeClient", 
hConfig.get("fs.myscheme.impl.client"));
+  }
 }

Reply via email to