This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c574dff [GOBBLIN-1076] Make Gobblin cluster working directories
configurable
c574dff is described below
commit c574dff7d4e66b126986510c08c55aab74b03964
Author: sv2000 <[email protected]>
AuthorDate: Tue Mar 10 17:23:14 2020 -0700
[GOBBLIN-1076] Make Gobblin cluster working directories configurable
Closes #2916 from sv2000/clusterWorkDir
---
.../cluster/GobblinClusterConfigurationKeys.java | 1 +
.../gobblin/cluster/GobblinClusterManager.java | 20 +--------
.../gobblin/cluster/GobblinClusterUtils.java | 32 +++++++++++++--
.../apache/gobblin/cluster/GobblinTaskRunner.java | 25 +++---------
.../org/apache/gobblin/cluster/SingleTask.java | 2 -
.../apache/gobblin/cluster/SingleTaskRunner.java | 4 +-
.../gobblin/cluster/GobblinClusterUtilsTest.java | 47 +++++++++++-----------
.../org/apache/gobblin/cluster/TestSingleTask.java | 40 +++++++++---------
.../cluster/suite/IntegrationBasicSuite.java | 6 +--
.../test/resources/_workunits/store/workunit.wu | 0
gobblin-cluster/src/test/resources/clusterConf | 1 -
.../gobblin/yarn/GobblinYarnAppLauncher.java | 17 ++++----
.../java/org/apache/gobblin/yarn/YarnService.java | 2 +-
13 files changed, 96 insertions(+), 101 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 7c27d20..7dbf7b1 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -45,6 +45,7 @@ public class GobblinClusterConfigurationKeys {
public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
public static final String STANDALONE_CLUSTER_MODE_KEY =
GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
+ // Root working directory for Gobblin cluster
public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX +
"workDir";
public static final String DISTRIBUTED_JOB_LAUNCHER_ENABLED =
GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherEnabled";
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 2d967d6..8eabad1 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -18,7 +18,6 @@
package org.apache.gobblin.cluster;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -72,7 +71,6 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -156,9 +154,10 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
initializeHelixManager();
- this.fs = buildFileSystem(config);
+ this.fs = GobblinClusterUtils.buildFileSystem(config, new Configuration());
this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
: GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs,
clusterName, applicationId);
+ LOGGER.info("Configured GobblinClusterManager work dir to: {}",
this.appWorkDir);
initializeAppLauncherAndServices();
}
@@ -358,21 +357,6 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
}
/**
- * Build the {@link FileSystem} for the Application Master.
- */
- private FileSystem buildFileSystem(Config config) throws IOException {
- Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config,
GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
-
- Configuration conf = new Configuration();
- //Add any Hadoop-specific overrides into the Configuration object
-
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides),
conf);
-
- return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
- .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
- : FileSystem.get(conf);
- }
-
- /**
* Build the {@link GobblinHelixJobScheduler} for the Application Master.
*/
private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config
config, Path appWorkDir,
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index e414074..a7e4ad8 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -17,9 +17,12 @@
package org.apache.gobblin.cluster;
+import java.io.IOException;
import java.net.InetAddress;
+import java.net.URI;
import java.net.UnknownHostException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,11 +31,13 @@ import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
-
-import static
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.gobblin.util.PathUtils;
@Alpha
@Slf4j
@@ -63,8 +68,9 @@ public class GobblinClusterUtils {
public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
String applicationName, String applicationId) {
- if (config.hasPath(CLUSTER_WORK_DIR)) {
- return new Path(config.getString(CLUSTER_WORK_DIR));
+ if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
+ return new Path(new Path(fs.getUri()),
PathUtils.combinePaths(config.getString(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR),
+ getAppWorkDirPath(applicationName, applicationId)));
}
return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName,
applicationId));
}
@@ -129,4 +135,22 @@ public class GobblinClusterUtils {
return getDynamicConfig(config).withFallback(config);
}
+ /**
+ * A utility method to construct a {@link FileSystem} object with the
configured Hadoop overrides provided as part of
+ * the cluster configuration.
+ * @param config
+ * @param conf
+ * @return a {@link FileSystem} object that is instantiated with the
appropriated Hadoop config overrides.
+ * @throws IOException
+ */
+ public static FileSystem buildFileSystem(Config config, Configuration conf)
+ throws IOException {
+ Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config,
GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
+
+ //Add any Hadoop-specific overrides into the Configuration object
+
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides),
conf);
+ return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
+ .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
+ : FileSystem.get(conf);
+ }
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index d71dbd9..6b3fff7 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.URI;
import java.net.UnknownHostException;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -74,20 +73,15 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.FileUtils;
import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import static
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
-
-
/**
* The main class running in the containers managing services for running
Gobblin
* {@link org.apache.gobblin.source.workunit.WorkUnit}s.
@@ -113,6 +107,9 @@ import static
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
*/
@Alpha
public class GobblinTaskRunner implements StandardMetricsBridge {
+ // Working directory key for applications. This config is set dynamically.
+ public static final String CLUSTER_APP_WORK_DIR =
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";
+
private static final Logger logger =
LoggerFactory.getLogger(GobblinTaskRunner.class);
static final java.nio.file.Path CLUSTER_CONF_PATH =
Paths.get("generated-gobblin-cluster.conf");
@@ -169,10 +166,11 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
this.dedicatedTaskDriverCluster = ConfigUtils.getBoolean(config,
GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED,
false);
Configuration conf = HadoopUtils.newConfiguration();
- this.fs = buildFileSystem(config, conf);
+ this.fs = GobblinClusterUtils.buildFileSystem(config, conf);
this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
this.clusterConfig = saveConfigToFile(config);
this.clusterName =
this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ logger.info("Configured GobblinTaskRunner work dir to: {}",
this.appWorkPath.toString());
//Set system properties passed in via application config. As an example,
Helix uses System#getProperty() for ZK configuration
// overrides such as sessionTimeout. In this case, the overrides specified
@@ -279,7 +277,7 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
private Config saveConfigToFile(Config config)
throws IOException {
Config newConf = config
- .withValue(CLUSTER_WORK_DIR,
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
+ .withValue(CLUSTER_APP_WORK_DIR,
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
ConfigUtils configUtils = new ConfigUtils(new FileUtils());
configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
return newConf;
@@ -437,17 +435,6 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
});
}
- private FileSystem buildFileSystem(Config config, Configuration conf)
- throws IOException {
- Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config,
GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
-
- //Add any Hadoop-specific overrides into the Configuration object
-
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides),
conf);
- return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
- .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
- : FileSystem.get(conf);
- }
-
private Optional<ContainerMetrics> buildContainerMetrics() {
Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
if (GobblinMetrics.isEnabled(properties)) {
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index be0f0de..4778371 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -50,8 +50,6 @@ import org.apache.gobblin.util.SerializationUtils;
public class SingleTask {
private static final Logger _logger =
LoggerFactory.getLogger(SingleTask.class);
- public static final String MAX_RETRY_WAITING_FOR_INIT_KEY =
"maxRetryBlockedOnTaskAttemptInit";
- public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
private GobblinMultiTaskAttempt _taskAttempt;
private String _jobId;
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index de97fec..8eabbba 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -44,8 +44,6 @@ import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
-import static
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
-
class SingleTaskRunner {
private static final Logger logger =
LoggerFactory.getLogger(SingleTaskRunner.class);
@@ -65,7 +63,7 @@ class SingleTaskRunner {
this.jobId = jobId;
this.workUnitFilePath = workUnitFilePath;
this.clusterConfig = ConfigFactory.parseFile(new
File(clusterConfigFilePath));
- final String workDir = this.clusterConfig.getString(CLUSTER_WORK_DIR);
+ final String workDir =
this.clusterConfig.getString(GobblinTaskRunner.CLUSTER_APP_WORK_DIR);
this.appWorkPath = new Path(workDir);
}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
index 9288d42..ee5d7cf 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -17,48 +17,47 @@
package org.apache.gobblin.cluster;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.Test;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.util.PathUtils;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
public class GobblinClusterUtilsTest {
-
- FileSystem fs = mock(FileSystem.class);
+ private static final String TEST_APP_NAME = "appName";
+ private static final String TEST_APP_ID = "appId";
+ private static final String TEST_WORK_DIR = "file:///foo/bar";
+ private static final String DEFAULT_HOME_DIR = "file:///home";
@Test
- public void work_dir_should_get_value_from_config_when_specified() throws
Exception {
- Map<String, String> configMap = new HashMap<>();
- configMap.put("gobblin.cluster.workDir", "/foo/bar");
-
- Config config = ConfigFactory.parseMap(configMap);
-
- Path workDirPath = GobblinClusterUtils
- .getAppWorkDirPathFromConfig(config, fs, "appName", "appid");
+ public void testGetAppWorkDirPathFromConfig() throws IOException {
+ FileSystem localFs = FileSystem.getLocal(new Configuration());
+ FileSystem mockFs = mock(FileSystem.class);
- assertEquals(new Path("/foo/bar"), workDirPath);
-
- }
-
- @Test
- public void
work_dir_should_get_default_calculated_value_when_not_specified() throws
Exception {
- Map<String, String> configMap = new HashMap<>();
- Config config = ConfigFactory.parseMap(configMap);
+ when(mockFs.getHomeDirectory()).thenReturn(new Path(DEFAULT_HOME_DIR));
+ when(mockFs.getUri()).thenReturn(localFs.getUri());
- when(fs.getHomeDirectory()).thenReturn(new Path("/home/"));
+ //Set gobblin.cluster.workDir config
+ Config config =
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR,
+ ConfigValueFactory.fromAnyRef(TEST_WORK_DIR));
+ Path workDirPath = GobblinClusterUtils.getAppWorkDirPathFromConfig(config,
localFs, TEST_APP_NAME, TEST_APP_ID);
- Path workDirPath = GobblinClusterUtils
- .getAppWorkDirPathFromConfig(config, fs, "appName", "appid");
+ assertEquals(PathUtils.combinePaths(TEST_WORK_DIR, TEST_APP_NAME,
TEST_APP_ID), workDirPath);
- assertEquals(new Path("/home/appName/appid"), workDirPath);
+ //Get workdir when gobblin.cluster.workDir is not specified
+ workDirPath = GobblinClusterUtils
+ .getAppWorkDirPathFromConfig(ConfigFactory.empty(), mockFs,
TEST_APP_NAME, TEST_APP_ID);
+ assertEquals(PathUtils.combinePaths(DEFAULT_HOME_DIR, TEST_APP_NAME,
TEST_APP_ID), workDirPath);
}
}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
index 2b6499c..04a800c 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
@@ -17,21 +17,21 @@
package org.apache.gobblin.cluster;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
-import org.apache.gobblin.testing.AssertWithBackoff;
import org.junit.Assert;
import org.testng.annotations.Test;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
-import javax.annotation.Nullable;
-
-import static
org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_KEY;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
/**
@@ -43,14 +43,18 @@ import static
org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_K
*/
public class TestSingleTask {
- private InMemorySingleTaskRunner createInMemoryTaskRunner() {
- final String clusterConfigPath = "clusterConf";
- final String wuPath = "_workunits/store/workunit.wu";
- String clusterConfPath =
this.getClass().getClassLoader().getResource(clusterConfigPath).getPath();
+ private InMemorySingleTaskRunner createInMemoryTaskRunner()
+ throws IOException {
+ final File clusterWorkDirPath = Files.createTempDir();
+ Path clusterConfigPath = Paths.get(clusterWorkDirPath.getAbsolutePath(),
"clusterConf");
+ Config config =
ConfigFactory.empty().withValue(GobblinTaskRunner.CLUSTER_APP_WORK_DIR,
ConfigValueFactory.fromAnyRef(clusterWorkDirPath.toString()));
+ ConfigUtils configUtils = new ConfigUtils(new FileUtils());
+ configUtils.saveConfigToFile(config, clusterConfigPath);
- InMemorySingleTaskRunner inMemorySingleTaskRunner = new
InMemorySingleTaskRunner(clusterConfPath, "testJob",
- this.getClass().getClassLoader().getResource(wuPath).getPath());
+ final Path wuPath = Paths.get(clusterWorkDirPath.getAbsolutePath(),
"_workunits/store/workunit.wu");
+ InMemorySingleTaskRunner inMemorySingleTaskRunner =
+ new InMemorySingleTaskRunner(clusterConfigPath.toString(), "testJob",
wuPath.toString());
return inMemorySingleTaskRunner;
}
@@ -60,15 +64,13 @@ public class TestSingleTask {
* re-run it again.
*/
@Test
- public void testSingleTaskRerunAfterFailure()
- throws Exception {
- SingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
+ public void testSingleTaskRerunAfterFailure() throws Exception {
+ InMemorySingleTaskRunner inMemorySingleTaskRunner =
createInMemoryTaskRunner();
try {
inMemorySingleTaskRunner.run(true);
} catch (Exception e) {
inMemorySingleTaskRunner.run();
}
-
Assert.assertTrue(true);
}
}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index 7da4915..e6c4587 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -130,8 +130,8 @@ public class IntegrationBasicSuite {
}
private void initWorkDir() throws IOException {
- // Relative to the current directory
- this.workPath = Paths.get("gobblin-integration-test-work-dir");
+ this.workPath =
Paths.get(ConfigFactory.parseURL(Resources.getResource("BasicCluster.conf"))
+ .getString(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR));
log.info("Created a new work directory: " +
this.workPath.toAbsolutePath());
// Delete the working directory in case the previous test fails to delete
the directory
@@ -182,7 +182,7 @@ public class IntegrationBasicSuite {
Map<String, String> configMap = new HashMap<>();
String zkConnectionString = this.testingZKServer.getConnectString();
configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY,
zkConnectionString);
- configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR,
this.workPath.toString());
+ configMap.put(GobblinTaskRunner.CLUSTER_APP_WORK_DIR,
this.workPath.toString());
Config overrideConfig = ConfigFactory.parseMap(configMap);
return overrideConfig.withFallback(config);
diff --git a/gobblin-cluster/src/test/resources/_workunits/store/workunit.wu
b/gobblin-cluster/src/test/resources/_workunits/store/workunit.wu
deleted file mode 100644
index e69de29..0000000
diff --git a/gobblin-cluster/src/test/resources/clusterConf
b/gobblin-cluster/src/test/resources/clusterConf
deleted file mode 100644
index 468d172..0000000
--- a/gobblin-cluster/src/test/resources/clusterConf
+++ /dev/null
@@ -1 +0,0 @@
-gobblin.cluster.workDir = <FILL IN THE PATH TO THE OWNING DIRECTORY>
\ No newline at end of file
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 543381a..3367842 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -238,9 +238,7 @@ public class GobblinYarnAppLauncher {
this.yarnClient = YarnClient.createYarnClient();
this.yarnClient.init(this.yarnConfiguration);
- this.fs = config.hasPath(ConfigurationKeys.FS_URI_KEY) ?
-
FileSystem.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)),
this.yarnConfiguration) :
- FileSystem.get(this.yarnConfiguration);
+ this.fs = GobblinClusterUtils.buildFileSystem(config,
this.yarnConfiguration);
this.closer.register(this.fs);
this.applicationStatusMonitor = Executors.newSingleThreadScheduledExecutor(
@@ -348,7 +346,7 @@ public class GobblinYarnAppLauncher {
!this.config.getBoolean(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY))
{
services.add(buildLogCopier(this.config,
new Path(this.sinkLogRootDir, this.applicationName + Path.SEPARATOR +
this.applicationId.get().toString()),
- GobblinClusterUtils.getAppWorkDirPath(this.fs, this.applicationName,
this.applicationId.get().toString())));
+ GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId.get().toString())));
}
if (config.getBoolean(ConfigurationKeys.JOB_EXECINFO_SERVER_ENABLED_KEY)) {
LOGGER.info("Starting the job execution info server since it is
enabled");
@@ -604,8 +602,10 @@ public class GobblinYarnAppLauncher {
}
private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId
applicationId) throws IOException {
- Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs,
this.applicationName, applicationId.toString());
+ Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, applicationId.toString());
+
Path appMasterWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
+ LOGGER.info("Configured GobblinApplicationMaster work directory to: {}",
appMasterWorkDir.toString());
Map<String, LocalResource> appMasterResources = Maps.newHashMap();
FileSystem localFs = FileSystem.getLocal(new Configuration());
@@ -639,8 +639,11 @@ public class GobblinYarnAppLauncher {
}
private void addContainerLocalResources(ApplicationId applicationId) throws
IOException {
- Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs,
this.applicationName, applicationId.toString());
+ Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, applicationId.toString());
+
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+ LOGGER.info("Configured Container work directory to: {}",
containerWorkDir.toString());
+
FileSystem localFs = FileSystem.getLocal(new Configuration());
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
@@ -850,7 +853,7 @@ public class GobblinYarnAppLauncher {
@VisibleForTesting
void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException
{
- Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs,
this.applicationName, applicationId.toString());
+ Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, applicationId.toString());
if (this.fs.exists(appWorkDir)) {
LOGGER.info("Deleting application working directory " + appWorkDir);
this.fs.delete(appWorkDir, true);
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 0c649ba..027be31 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -487,7 +487,7 @@ public class YarnService extends AbstractIdleService {
protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
throws IOException {
- Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs,
this.applicationName, this.applicationId);
+ Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId);
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
Map<String, LocalResource> resourceMap = Maps.newHashMap();