This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 26c22ef [GOBBLIN-1372] Generalization of
GobblinClusterUtils#setSystemProperties
26c22ef is described below
commit 26c22efedd1f9c6b6893ec495c11d9bcbba8dc2e
Author: Lei Sun <[email protected]>
AuthorDate: Tue Jan 26 14:47:27 2021 -0800
[GOBBLIN-1372] Generalization of GobblinClusterUtils#setSystemProperties
Closes #3213 from autumnust/GOBBLIN-1372
---
.gitignore | 8 ++
.../gobblin/cluster/GobblinClusterUtils.java | 96 +++++++++++++++++++---
.../gobblin/cluster/GobblinClusterUtilsTest.java | 20 +++++
3 files changed, 113 insertions(+), 11 deletions(-)
diff --git a/.gitignore b/.gitignore
index afa05f6..7748935 100644
--- a/.gitignore
+++ b/.gitignore
@@ -74,6 +74,13 @@ ligradle/checkstyle/suppressions.xml
gobblin-core/src/test/resources/serde/output-staging/
gobblin-integration-test-log-dir/
gobblin-modules/gobblin-elasticsearch/test-elasticsearch/
+gobblin-modules/*/bin/
+gobblin-metrics-libs/*/bin/
+gobblin-config-management/*/bin/
+gobblin-cluster/GobblinHelixJobLauncherTest/
+gobblin-restli/gobblin-restli-utils/bin/
+gobblin-test-harness/gobblin-test-harness/
+
temp/
ligradle/*
@@ -87,3 +94,4 @@ gobblin-integration-test-work-dir/
gobblin-test-utils/src/main/gen-avro/
gobblin-test-utils/src/main/gen-proto/
+gobblin_config/
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 3c78751..828906b 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -21,14 +21,22 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import com.google.api.client.util.Lists;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -46,10 +54,60 @@ import org.apache.gobblin.util.PathUtils;
@Alpha
@Slf4j
public class GobblinClusterUtils {
- public static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir";
+ static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir";
+ /**
+ * This template will be resolved by replacing "VALUE" as the value that
gobblin recognized.
+ * For more details, check {@link
GobblinClusterUtils#setSystemProperties(Config)}
+ */
+ private static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE
+ = GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX +
"systemPropertiesList.$VALUE";
+
+ /**
+ * This enum is used for specifying JVM options that Gobblin-Cluster will
set whose value will need to be obtained
+ * in JVM runtime.
+ * e.g. YARN_CACHE will be used by Gobblin-on-YARN (an extension of
Gobblin-Cluster) and resolved to an YARN-specific
+ * temporary location internal to the application.
+ *
+ * Note that we could specify a couple of keys associated with the value,
meaning the value should only be resolved
+ * to associated keys but nothing else to avoid abusive usage. Users could
also set resolved
+ * {@link #GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE} to expand default
associated-key list.
+ *
+ * e.g. setting `gobblin.cluster.systemPropertiesList.YARN_CACHE` = [a,b]
expands the associated-key list to
+ * [java.io.tmpdir, a, b]. Only when a key is found in the associated-key
list, then when you set
+ * {@link
GobblinClusterConfigurationKeys#GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX}.${keyName}=YARN_CACHE,
will the
+ * resolution for the -D${KeyName} = resolvedValue(YARN_CACHE) happen.
+ */
+ public enum JVM_ARG_VALUE_RESOLVER {
+ YARN_CACHE {
+ @Override
+ public List<String> getAssociatedKeys() {
+ return yarnCacheAssociatedKeys;
+ }
+
+ @Override
+ public String getResolution() {
+ //When keys like java.io.tmpdir is configured to "YARN_CACHE", it sets
the tmp dir to the Yarn container's cache location.
+ // This setting will only be useful when the cluster is deployed in
Yarn mode.
+ return System.getenv(ApplicationConstants.Environment.PWD.key());
+ }
+ };
+
+ // Kept for backward-compatibility
+ private static List<String> yarnCacheAssociatedKeys =
ImmutableList.of(JAVA_TMP_DIR_KEY);
- public enum TMP_DIR {
- YARN_CACHE
+ // default associated key with the value.
+ public abstract List<String> getAssociatedKeys() ;
+
+ public abstract String getResolution();
+
+ public static boolean contains(String value) {
+ for (JVM_ARG_VALUE_RESOLVER v : JVM_ARG_VALUE_RESOLVER.values()) {
+ if (v.name().equalsIgnoreCase(value)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
/**
@@ -122,23 +180,39 @@ public class GobblinClusterUtils {
* @param config
*/
public static void setSystemProperties(Config config) {
- Properties properties =
ConfigUtils.configToProperties(ConfigUtils.getConfig(config,
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX,
- ConfigFactory.empty()));
+ Properties properties =
ConfigUtils.configToProperties(ConfigUtils.getConfig(config,
+
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX,
ConfigFactory.empty()));
for (Map.Entry<Object, Object> entry: properties.entrySet()) {
- if (entry.getKey().toString().equals(JAVA_TMP_DIR_KEY)) {
- if
(entry.getValue().toString().equalsIgnoreCase(TMP_DIR.YARN_CACHE.toString())) {
- //When java.io.tmpdir is configured to "YARN_CACHE", it sets the tmp
dir to the Yarn container's cache location.
- // This setting will only be useful when the cluster is deployed in
Yarn mode.
- log.info("Setting tmp directory to: {}",
System.getenv(ApplicationConstants.Environment.PWD.key()));
- System.setProperty(entry.getKey().toString(),
System.getenv(ApplicationConstants.Environment.PWD.key()));
+ if (JVM_ARG_VALUE_RESOLVER.contains(entry.getValue().toString())) {
+ JVM_ARG_VALUE_RESOLVER enumMember =
JVM_ARG_VALUE_RESOLVER.valueOf(entry.getValue().toString());
+ List<String> allowedKeys = new
ArrayList<>(enumMember.getAssociatedKeys());
+ allowedKeys.addAll(getAdditionalKeys(entry.getValue().toString(),
config));
+
+ if (allowedKeys.contains(entry.getKey().toString())) {
+ log.info("Setting tmp directory to: {}", enumMember.getResolution());
+ System.setProperty(entry.getKey().toString(),
enumMember.getResolution());
continue;
+ } else {
+ log.warn("String {} not being registered for dynamic JVM-arg
resolution, "
+ + "considering add it by setting extension key", entry.getKey());
}
}
System.setProperty(entry.getKey().toString(),
entry.getValue().toString());
}
}
+ private static Collection<String> getAdditionalKeys(String value, Config
config) {
+ String resolvedKey =
GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE.replace("$VALUE", value);
+ if (config.hasPath(resolvedKey)) {
+ return StreamSupport.stream(
+
Splitter.on(",").trimResults().omitEmptyStrings().split(config.getString(resolvedKey)).spliterator(),
false
+ ).collect(Collectors.toList());
+ } else {
+ return Lists.newArrayList();
+ }
+ }
+
/**
* Get the dynamic config from a {@link DynamicConfigGenerator}
* @param config input config
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
index 4f58b2e..5ff1a3a 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -31,6 +31,7 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.util.PathUtils;
+import static org.apache.gobblin.cluster.GobblinClusterUtils.JAVA_TMP_DIR_KEY;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -77,6 +78,25 @@ public class GobblinClusterUtilsTest {
Assert.assertEquals(System.getProperty("prop1"), "val1");
Assert.assertEquals(System.getProperty("prop2"), "val2");
Assert.assertEquals(System.getProperty("prop3"), "val3");
+
+ // Test specifically for key resolution using YARN_CACHE as the example.
+ config =
config.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
+ "." +
+ JAVA_TMP_DIR_KEY,
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
+
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
+ ".randomKey1",
+
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
+
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
+ ".randomKey2",
+
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
+
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
+ ".rejectedKey",
+
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
+ .withValue("gobblin.cluster.systemPropertiesList.YARN_CACHE",
ConfigValueFactory.fromAnyRef("randomKey1,randomKey2"));
+ GobblinClusterUtils.setSystemProperties(config);
+ Assert.assertEquals(System.getProperty(JAVA_TMP_DIR_KEY),
GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
+ Assert.assertEquals(System.getProperty("randomKey1"),
GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
+ Assert.assertEquals(System.getProperty("randomKey2"),
GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
+ // For keys not being added in the list of
`gobblin.cluster.systemPropertiesList.YARN_CACHE`, the value wont'
+ // be resolved.
+ Assert.assertEquals(System.getProperty("rejectedKey"),
GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name());
+
}
}