This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c22ef  [GOBBLIN-1372] Generalization of 
GobblinClusterUtils#setSystemProperties
26c22ef is described below

commit 26c22efedd1f9c6b6893ec495c11d9bcbba8dc2e
Author: Lei Sun <[email protected]>
AuthorDate: Tue Jan 26 14:47:27 2021 -0800

    [GOBBLIN-1372] Generalization of GobblinClusterUtils#setSystemProperties
    
    Closes #3213 from autumnust/GOBBLIN-1372
---
 .gitignore                                         |  8 ++
 .../gobblin/cluster/GobblinClusterUtils.java       | 96 +++++++++++++++++++---
 .../gobblin/cluster/GobblinClusterUtilsTest.java   | 20 +++++
 3 files changed, 113 insertions(+), 11 deletions(-)

diff --git a/.gitignore b/.gitignore
index afa05f6..7748935 100644
--- a/.gitignore
+++ b/.gitignore
@@ -74,6 +74,13 @@ ligradle/checkstyle/suppressions.xml
 gobblin-core/src/test/resources/serde/output-staging/
 gobblin-integration-test-log-dir/
 gobblin-modules/gobblin-elasticsearch/test-elasticsearch/
+gobblin-modules/*/bin/
+gobblin-metrics-libs/*/bin/
+gobblin-config-management/*/bin/
+gobblin-cluster/GobblinHelixJobLauncherTest/
+gobblin-restli/gobblin-restli-utils/bin/
+gobblin-test-harness/gobblin-test-harness/
+
 
 temp/
 ligradle/*
@@ -87,3 +94,4 @@ gobblin-integration-test-work-dir/
 gobblin-test-utils/src/main/gen-avro/
 gobblin-test-utils/src/main/gen-proto/
 
+gobblin_config/
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 3c78751..828906b 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -21,14 +21,22 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
+import com.google.api.client.util.Lists;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -46,10 +54,60 @@ import org.apache.gobblin.util.PathUtils;
 @Alpha
 @Slf4j
 public class GobblinClusterUtils {
-  public static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir";
+  static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir";
+  /**
+   * This template will be resolved by replacing "VALUE" as the value that 
gobblin recognized.
+   * For more details, check {@link 
GobblinClusterUtils#setSystemProperties(Config)}
+   */
+  private static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE
+      = GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + 
"systemPropertiesList.$VALUE";
+
+  /**
+   * This enum is used for specifying JVM options that Gobblin-Cluster will 
set whose value will need to be obtained
+   * in JVM runtime.
+   * e.g. YARN_CACHE will be used by Gobblin-on-YARN (an extension of 
Gobblin-Cluster) and resolved to an YARN-specific
+   * temporary location internal to the application.
+   *
+   * Note that we could specify a couple of keys associated with the value, 
meaning the value should only be resolved
+   * to associated keys but nothing else to avoid abusive usage. Users could 
also set resolved
+   * {@link #GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE} to expand default 
associated-key list.
+   *
+   * e.g. setting `gobblin.cluster.systemPropertiesList.YARN_CACHE` = [a,b] 
expands the associated-key list to
+   * [java.io.tmpdir, a, b]. Only when a key is found in the associated-key 
list, then when you set
+   * {@link 
GobblinClusterConfigurationKeys#GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX}.${keyName}=YARN_CACHE,
 will the
+   * resolution for the -D${KeyName} = resolvedValue(YARN_CACHE) happen.
+   */
+  public enum JVM_ARG_VALUE_RESOLVER {
+    YARN_CACHE {
+      @Override
+      public List<String> getAssociatedKeys() {
+        return yarnCacheAssociatedKeys;
+      }
+
+      @Override
+      public String getResolution() {
+        //When keys like java.io.tmpdir is configured to "YARN_CACHE", it sets 
the tmp dir to the Yarn container's cache location.
+        // This setting will only be useful when the cluster is deployed in 
Yarn mode.
+        return System.getenv(ApplicationConstants.Environment.PWD.key());
+      }
+    };
+
+    // Kept for backward-compatibility
+    private static List<String> yarnCacheAssociatedKeys = 
ImmutableList.of(JAVA_TMP_DIR_KEY);
 
-  public enum TMP_DIR {
-    YARN_CACHE
+    // default associated key with the value.
+    public abstract List<String> getAssociatedKeys() ;
+
+    public abstract String getResolution();
+
+    public static boolean contains(String value) {
+      for (JVM_ARG_VALUE_RESOLVER v : JVM_ARG_VALUE_RESOLVER.values()) {
+        if (v.name().equalsIgnoreCase(value)) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 
   /**
@@ -122,23 +180,39 @@ public class GobblinClusterUtils {
    * @param config
    */
   public static void setSystemProperties(Config config) {
-    Properties properties = 
ConfigUtils.configToProperties(ConfigUtils.getConfig(config, 
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX,
-        ConfigFactory.empty()));
+    Properties properties = 
ConfigUtils.configToProperties(ConfigUtils.getConfig(config,
+        
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX, 
ConfigFactory.empty()));
 
     for (Map.Entry<Object, Object> entry: properties.entrySet()) {
-      if (entry.getKey().toString().equals(JAVA_TMP_DIR_KEY)) {
-        if 
(entry.getValue().toString().equalsIgnoreCase(TMP_DIR.YARN_CACHE.toString())) {
-          //When java.io.tmpdir is configured to "YARN_CACHE", it sets the tmp 
dir to the Yarn container's cache location.
-          // This setting will only be useful when the cluster is deployed in 
Yarn mode.
-          log.info("Setting tmp directory to: {}", 
System.getenv(ApplicationConstants.Environment.PWD.key()));
-          System.setProperty(entry.getKey().toString(), 
System.getenv(ApplicationConstants.Environment.PWD.key()));
+      if (JVM_ARG_VALUE_RESOLVER.contains(entry.getValue().toString())) {
+        JVM_ARG_VALUE_RESOLVER enumMember = 
JVM_ARG_VALUE_RESOLVER.valueOf(entry.getValue().toString());
+        List<String> allowedKeys = new 
ArrayList<>(enumMember.getAssociatedKeys());
+        allowedKeys.addAll(getAdditionalKeys(entry.getValue().toString(), 
config));
+
+        if (allowedKeys.contains(entry.getKey().toString())) {
+          log.info("Setting tmp directory to: {}", enumMember.getResolution());
+          System.setProperty(entry.getKey().toString(), 
enumMember.getResolution());
           continue;
+        } else {
+          log.warn("String {} not being registered for dynamic JVM-arg 
resolution, "
+              + "considering add it by setting extension key", entry.getKey());
         }
       }
       System.setProperty(entry.getKey().toString(), 
entry.getValue().toString());
     }
   }
 
+  private static Collection<String> getAdditionalKeys(String value, Config 
config) {
+    String resolvedKey = 
GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE.replace("$VALUE", value);
+    if (config.hasPath(resolvedKey)) {
+      return StreamSupport.stream(
+          
Splitter.on(",").trimResults().omitEmptyStrings().split(config.getString(resolvedKey)).spliterator(),
 false
+      ).collect(Collectors.toList());
+    } else {
+      return Lists.newArrayList();
+    }
+  }
+
   /**
    * Get the dynamic config from a {@link DynamicConfigGenerator}
    * @param config input config
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
index 4f58b2e..5ff1a3a 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -31,6 +31,7 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.util.PathUtils;
 
+import static org.apache.gobblin.cluster.GobblinClusterUtils.JAVA_TMP_DIR_KEY;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -77,6 +78,25 @@ public class GobblinClusterUtilsTest {
     Assert.assertEquals(System.getProperty("prop1"), "val1");
     Assert.assertEquals(System.getProperty("prop2"), "val2");
     Assert.assertEquals(System.getProperty("prop3"), "val3");
+
+    // Test specifically for key resolution using YARN_CACHE as the example.
+    config = 
config.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
 + "." +
+        JAVA_TMP_DIR_KEY, 
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
+        
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
 + ".randomKey1",
+            
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
+        
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
 + ".randomKey2",
+            
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
+        
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
 + ".rejectedKey",
+            
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
+        .withValue("gobblin.cluster.systemPropertiesList.YARN_CACHE", 
ConfigValueFactory.fromAnyRef("randomKey1,randomKey2"));
+    GobblinClusterUtils.setSystemProperties(config);
+    Assert.assertEquals(System.getProperty(JAVA_TMP_DIR_KEY), 
GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
+    Assert.assertEquals(System.getProperty("randomKey1"), 
GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
+    Assert.assertEquals(System.getProperty("randomKey2"), 
GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
+    // For keys not being added in the list of 
`gobblin.cluster.systemPropertiesList.YARN_CACHE`, the value wont'
+    // be resolved.
+    Assert.assertEquals(System.getProperty("rejectedKey"), 
GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name());
+
   }
 
 }

Reply via email to