This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new cc33043 [GOBBLIN-1209] Provide an option to configure the java tmp
dir to the …
cc33043 is described below
commit cc33043a4319901da0a89f9ec304dd0283d8b56f
Author: sv2000 <[email protected]>
AuthorDate: Mon Jul 13 14:10:35 2020 -0700
[GOBBLIN-1209] Provide an option to configure the java tmp dir to the …
Closes #3056 from sv2000/javaTmpDir
---
.../gobblin/cluster/GobblinClusterManager.java | 20 +++++++------
.../gobblin/cluster/GobblinClusterUtils.java | 35 +++++++++++++++++++---
.../apache/gobblin/cluster/GobblinTaskRunner.java | 13 ++++----
.../org/apache/gobblin/cluster/HelixUtils.java | 18 -----------
.../gobblin/cluster/GobblinClusterUtilsTest.java | 19 ++++++++++++
.../org/apache/gobblin/cluster/HelixUtilsTest.java | 18 -----------
.../gobblin/yarn/GobblinApplicationMaster.java | 11 ++-----
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 6 ++--
8 files changed, 75 insertions(+), 65 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 011db74..c2415e6 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -142,23 +142,25 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
public GobblinClusterManager(String clusterName, String applicationId,
Config sysConfig,
Optional<Path> appWorkDirOptional) throws Exception {
+ // Set system properties passed in via application config. As an example,
Helix uses System#getProperty() for ZK configuration
+ // overrides such as sessionTimeout. In this case, the overrides specified
+ // in the application configuration have to be extracted and set before
initializing HelixManager.
+ GobblinClusterUtils.setSystemProperties(sysConfig);
+
+ //Add dynamic config
+ this.config = GobblinClusterUtils.addDynamicConfig(sysConfig);
+
this.clusterName = clusterName;
- this.config = sysConfig;
- this.isStandaloneMode = ConfigUtils.getBoolean(sysConfig,
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
+ this.isStandaloneMode = ConfigUtils.getBoolean(this.config,
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
this.applicationId = applicationId;
- // Set system properties passed in via application config. As an example,
Helix uses System#getProperty() for ZK configuration
- // overrides such as sessionTimeout. In this case, the overrides specified
- // in the application configuration have to be extracted and set before
initializing HelixManager.
- HelixUtils.setSystemProperties(sysConfig);
-
initializeHelixManager();
- this.fs = GobblinClusterUtils.buildFileSystem(sysConfig, new
Configuration());
+ this.fs = GobblinClusterUtils.buildFileSystem(this.config, new
Configuration());
this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
- : GobblinClusterUtils.getAppWorkDirPathFromConfig(sysConfig, this.fs,
clusterName, applicationId);
+ : GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config,
this.fs, clusterName, applicationId);
LOGGER.info("Configured GobblinClusterManager work dir to: {}",
this.appWorkDir);
initializeAppLauncherAndServices();
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index a7e4ad8..3c78751 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -21,12 +21,16 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
@@ -42,6 +46,11 @@ import org.apache.gobblin.util.PathUtils;
@Alpha
@Slf4j
public class GobblinClusterUtils {
+ public static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir";
+
+ public enum TMP_DIR {
+ YARN_CACHE
+ }
/**
* Get the name of the current host.
@@ -62,10 +71,6 @@ public class GobblinClusterUtils {
* @param applicationId the application ID in string form
* @return the cluster application working directory {@link Path}
*/
- public static Path getAppWorkDirPath(FileSystem fs, String applicationName,
String applicationId) {
- return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName,
applicationId));
- }
-
public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
String applicationName, String applicationId) {
if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
@@ -113,6 +118,28 @@ public class GobblinClusterUtils {
}
/**
+ * Set the system properties from the input {@link Config} instance
+ * @param config
+ */
+ public static void setSystemProperties(Config config) {
+ Properties properties =
ConfigUtils.configToProperties(ConfigUtils.getConfig(config,
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX,
+ ConfigFactory.empty()));
+
+ for (Map.Entry<Object, Object> entry: properties.entrySet()) {
+ if (entry.getKey().toString().equals(JAVA_TMP_DIR_KEY)) {
+ if
(entry.getValue().toString().equalsIgnoreCase(TMP_DIR.YARN_CACHE.toString())) {
+ //When java.io.tmpdir is configured to "YARN_CACHE", it sets the tmp
dir to the Yarn container's cache location.
+ // This setting will only be useful when the cluster is deployed in
Yarn mode.
+ log.info("Setting tmp directory to: {}",
System.getenv(ApplicationConstants.Environment.PWD.key()));
+ System.setProperty(entry.getKey().toString(),
System.getenv(ApplicationConstants.Environment.PWD.key()));
+ continue;
+ }
+ }
+ System.setProperty(entry.getKey().toString(),
entry.getValue().toString());
+ }
+ }
+
+ /**
* Get the dynamic config from a {@link DynamicConfigGenerator}
* @param config input config
* @return the dynamic config
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 08d4dd0..7045c29 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -183,6 +183,14 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
String taskRunnerId,
Config config,
Optional<Path> appWorkDirOptional) throws Exception {
+ // Set system properties passed in via application config. As an example,
Helix uses System#getProperty() for ZK configuration
+ // overrides such as sessionTimeout. In this case, the overrides specified
+ // in the application configuration have to be extracted and set before
initializing HelixManager.
+ GobblinClusterUtils.setSystemProperties(config);
+
+ //Add dynamic config
+ config = GobblinClusterUtils.addDynamicConfig(config);
+
this.isTaskDriver = ConfigUtils.getBoolean(config,
GobblinClusterConfigurationKeys.TASK_DRIVER_ENABLED,false);
this.helixInstanceName = helixInstanceName;
this.taskRunnerId = taskRunnerId;
@@ -206,11 +214,6 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
logger.info("Configured GobblinTaskRunner work dir to: {}",
this.appWorkPath.toString());
- // Set system properties passed in via application config. As an example,
Helix uses System#getProperty() for ZK configuration
- // overrides such as sessionTimeout. In this case, the overrides specified
- // in the application configuration have to be extracted and set before
initializing HelixManager.
- HelixUtils.setSystemProperties(config);
-
this.isContainerExitOnHealthCheckFailureEnabled =
ConfigUtils.getBoolean(config,
GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED,
GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED);
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index f70a741..b825417 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -47,15 +46,11 @@ import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.listeners.JobListener;
-import org.apache.gobblin.util.ConfigUtils;
import static org.apache.helix.task.TaskState.STOPPED;
@@ -326,19 +321,6 @@ public class HelixUtils {
}
/**
- * Return the system properties from the input {@link Config} instance
- * @param config
- */
- public static void setSystemProperties(Config config) {
- Properties properties =
ConfigUtils.configToProperties(ConfigUtils.getConfig(config,
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX,
- ConfigFactory.empty()));
-
- for (Map.Entry<Object, Object> entry: properties.entrySet()) {
- System.setProperty(entry.getKey().toString(),
entry.getValue().toString());
- }
- }
-
- /**
* A utility method that returns all current live instances in a given Helix
cluster. This method assumes that
* the passed {@link HelixManager} instance is already connected.
* @param helixManager
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
index ee5d7cf..4f58b2e 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
import org.testng.annotations.Test;
import com.typesafe.config.Config;
@@ -60,4 +61,22 @@ public class GobblinClusterUtilsTest {
.getAppWorkDirPathFromConfig(ConfigFactory.empty(), mockFs,
TEST_APP_NAME, TEST_APP_ID);
assertEquals(PathUtils.combinePaths(DEFAULT_HOME_DIR, TEST_APP_NAME,
TEST_APP_ID), workDirPath);
}
+
+ @Test
+ public void testSetSystemProperties() {
+ //Set a dummy property before calling
GobblinClusterUtils#setSystemProperties() and assert that this property and
value
+ //exists even after the call to the setSystemProperties() method.
+ System.setProperty("prop1", "val1");
+
+ Config config =
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
+ ".prop2",
+ ConfigValueFactory.fromAnyRef("val2"))
+
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
+ ".prop3", ConfigValueFactory.fromAnyRef("val3"));
+
+ GobblinClusterUtils.setSystemProperties(config);
+
+ Assert.assertEquals(System.getProperty("prop1"), "val1");
+ Assert.assertEquals(System.getProperty("prop2"), "val2");
+ Assert.assertEquals(System.getProperty("prop3"), "val3");
+ }
+
}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
index 398d4d2..d7841a8 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
@@ -33,7 +33,6 @@ import org.testng.annotations.Test;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.util.ConfigUtils;
@@ -81,23 +80,6 @@ public class HelixUtilsTest {
Assert.assertEquals(properties.getProperty("k5"), "10000");
}
- @Test
- public void testSetSystemProperties() {
- //Set a dummy property before calling HelixUtils#setSystemProperties() and
assert that this property and value
- //exists even after the call to the setSystemProperties() method.
- System.setProperty("prop1", "val1");
-
- Config config =
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
+ ".prop2",
- ConfigValueFactory.fromAnyRef("val2"))
-
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX
+ ".prop3", ConfigValueFactory.fromAnyRef("val3"));
-
- HelixUtils.setSystemProperties(config);
-
- Assert.assertEquals(System.getProperty("prop1"), "val1");
- Assert.assertEquals(System.getProperty("prop2"), "val2");
- Assert.assertEquals(System.getProperty("prop3"), "val3");
- }
-
@AfterClass
public void tearDown() throws IOException {
if (this.fileSystem.exists(this.tokenFilePath.getParent())) {
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index 5ae3f82..433dfaf 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -25,9 +25,6 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.util.logs.LogCopier;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
@@ -35,13 +32,11 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
-
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +56,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -85,9 +81,8 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
public GobblinApplicationMaster(String applicationName, String
applicationId, ContainerId containerId, Config config,
YarnConfiguration yarnConfiguration) throws Exception {
- super(applicationName, applicationId,
GobblinClusterUtils.addDynamicConfig(config
- .withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
-
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
+ super(applicationName, applicationId,
config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString()))),
Optional.<Path>absent());
String containerLogDir =
config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index d3d91df..f6d7949 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -62,9 +62,9 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
public GobblinYarnTaskRunner(String applicationName, String applicationId,
String helixInstanceName, ContainerId containerId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
- super(applicationName, helixInstanceName, applicationId,
getTaskRunnerId(containerId),
-
GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
-
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
appWorkDirOptional);
+ super(applicationName, helixInstanceName, applicationId,
getTaskRunnerId(containerId), config
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString()))),
appWorkDirOptional);
}
@Override