This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new d761f66 [GOBBLIN-1036] Add hadoop override configurations when instantiating FileSystem object in GobblinTaskRunner and GobblinClusterManager d761f66 is described below commit d761f66c28523f17948790c2a7099181788e3408 Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Fri Jan 31 08:49:54 2020 -0800 [GOBBLIN-1036] Add hadoop override configurations when instantiating FileSystem object in GobblinTaskRunner and GobblinClusterManager Closes #2878 from sv2000/parallelRunner --- .../gobblin/cluster/GobblinClusterConfigurationKeys.java | 2 ++ .../org/apache/gobblin/cluster/GobblinClusterManager.java | 12 ++++++++++-- .../java/org/apache/gobblin/cluster/GobblinTaskRunner.java | 9 ++++++++- .../apache/gobblin/cluster/GobblinClusterManagerTest.java | 12 ++++++++++++ .../org/apache/gobblin/cluster/GobblinTaskRunnerTest.java | 13 +++++++++++++ 5 files changed, 45 insertions(+), 3 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 28339f4..787130e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -181,4 +181,6 @@ public class GobblinClusterConfigurationKeys { // the cluster public static final String IS_HELIX_CLUSTER_MANAGED = GOBBLIN_CLUSTER_PREFIX + "isHelixClusterManaged"; public static final boolean DEFAULT_IS_HELIX_CLUSTER_MANAGED = false; + + public static final String HADOOP_CONFIG_OVERRIDES_PREFIX = GOBBLIN_CLUSTER_PREFIX + "hadoop.inject"; } diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 2826720..e1e8fd6 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -73,6 +73,7 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.JobConfigurationUtils; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -114,6 +115,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri protected final Path appWorkDir; + @Getter protected final FileSystem fs; protected final String applicationId; @@ -353,9 +355,15 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri * Build the {@link FileSystem} for the Application Master. */ private FileSystem buildFileSystem(Config config) throws IOException { + Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX); + + Configuration conf = new Configuration(); + //Add any Hadoop-specific overrides into the Configuration object + JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides), conf); + return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem - .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), new Configuration()) - : FileSystem.get(new Configuration()); + .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf) + : FileSystem.get(conf); } /** diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 5ada074..22c21bf 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -69,6 +69,8 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import lombok.Getter; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.StandardMetricsBridge; @@ -77,6 +79,7 @@ import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.FileUtils; import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.JobConfigurationUtils; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -108,7 +111,6 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER */ @Alpha public class GobblinTaskRunner implements StandardMetricsBridge { - private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class); static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf"); @@ -140,6 +142,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { protected final Config config; + @Getter protected final FileSystem fs; private final List<Service> services = Lists.newArrayList(); protected final String applicationName; @@ -420,6 +423,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private FileSystem buildFileSystem(Config config, Configuration conf) throws IOException { + Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX); + + //Add any Hadoop-specific overrides into the Configuration object + JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides), conf); return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf) : FileSystem.get(conf); diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java index 9069c1e..840c8a6 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java @@ -21,6 +21,7 @@ import java.net.URL; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; @@ -61,6 +62,7 @@ import org.apache.gobblin.testing.AssertWithBackoff; @Test(groups = { "gobblin.cluster" }) public class GobblinClusterManagerTest implements HelixMessageTestBase { public final static Logger LOG = LoggerFactory.getLogger(GobblinClusterManagerTest.class); + public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop"; private TestingServer testingZKServer; @@ -83,6 +85,10 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase { ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString())) .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY, ConfigValueFactory.fromAnyRef("DEFAULT:1,OTHER:10")) + .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + HADOOP_OVERRIDE_PROPERTY_NAME, + ConfigValueFactory.fromAnyRef("value")) + .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + "fs.file.impl.disable.cache", + ConfigValueFactory.fromAnyRef("true")) .resolve(); String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); @@ -176,6 +182,12 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase { }, "Cluster Manager shutdown"); } + @Test + public void testBuildFileSystemConfig() { + FileSystem fileSystem = this.gobblinClusterManager.getFs(); + Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value"); + } + @AfterClass public void tearDown() throws Exception { try { diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java index 286c50a..b115607 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.URL; import org.apache.curator.test.TestingServer; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +53,8 @@ import org.apache.gobblin.testing.AssertWithBackoff; public class GobblinTaskRunnerTest { public final static Logger LOG = LoggerFactory.getLogger(GobblinTaskRunnerTest.class); + public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop"; + private TestingServer testingZKServer; private GobblinTaskRunner gobblinTaskRunner; @@ -70,6 +73,10 @@ public class GobblinTaskRunnerTest { Config config = ConfigFactory.parseURL(url) .withValue("gobblin.cluster.zk.connection.string", ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString())) + .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + HADOOP_OVERRIDE_PROPERTY_NAME, + ConfigValueFactory.fromAnyRef("value")) + .withValue(GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX + "." + "fs.file.impl.disable.cache", + ConfigValueFactory.fromAnyRef("true")) .resolve(); String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); @@ -104,6 +111,12 @@ public class GobblinTaskRunnerTest { }, "gobblinTaskRunner stopped"); } + @Test + public void testBuildFileSystemConfig() { + FileSystem fileSystem = this.gobblinTaskRunner.getFs(); + Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value"); + } + @AfterClass public void tearDown() throws IOException { try {