This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4bb0c40bfb [GOBBLIN-2172] Enable launching GoT YARN AM and workers
with proxy HTTPS host+port (#4075)
4bb0c40bfb is described below
commit 4bb0c40bfbf75133dd6b1820f0718cd22fed2f00
Author: abhishekmjain <[email protected]>
AuthorDate: Fri Nov 15 01:30:36 2024 +0530
[GOBBLIN-2172] Enable launching GoT YARN AM and workers with proxy HTTPS
host+port (#4075)
---
.../azkaban/AzkabanGobblinYarnAppLauncher.java | 3 +-
.../apache/gobblin/temporal/yarn/YarnService.java | 6 ++
.../apache/gobblin/util/AzkabanLauncherUtils.java | 59 ++++++++++++++++++
.../apache/gobblin/AzkabanLauncherUtilsTest.java | 70 ++++++++++++++++++++++
.../gobblin/yarn/GobblinYarnAppLauncher.java | 6 ++
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 6 ++
.../gobblin/yarn/GobblinYarnAppLauncherTest.java | 12 +++-
7 files changed, 160 insertions(+), 2 deletions(-)
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index db279355fa..be3ad5c8f4 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -32,6 +32,7 @@ import com.typesafe.config.ConfigValueFactory;
import azkaban.jobExecutor.AbstractJob;
import lombok.Getter;
+import org.apache.gobblin.util.AzkabanLauncherUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
@@ -63,7 +64,7 @@ public class AzkabanGobblinYarnAppLauncher extends
AbstractJob {
public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps)
throws IOException {
super(jobId, LOGGER);
-
+ gobblinProps =
AzkabanLauncherUtils.undoPlaceholderConversion(gobblinProps);
addRuntimeProperties(gobblinProps);
Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 47d76b963e..c8fbd047c5 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -157,6 +158,7 @@ class YarnService extends AbstractIdleService {
private final Optional<String> containerJvmArgs;
private final String containerTimezone;
+ private final String proxyJvmArgs;
@Getter(AccessLevel.PROTECTED)
private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
@@ -240,6 +242,9 @@ class YarnService extends AbstractIdleService {
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
:
Optional.<String>absent();
+ this.proxyJvmArgs =
config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
+
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS)
: StringUtils.EMPTY;
+
int numContainerLaunchThreads =
ConfigUtils.getInt(config,
GobblinYarnConfigurationKeys.MAX_CONTAINER_LAUNCH_THREADS_KEY,
GobblinYarnConfigurationKeys.DEFAULT_MAX_CONTAINER_LAUNCH_THREADS);
@@ -594,6 +599,7 @@ class YarnService extends AbstractIdleService {
.append("
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
.append("
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))
+ .append(" ").append(this.proxyJvmArgs)
.append(" ").append(GobblinTemporalYarnTaskRunner.class.getName())
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java
new file mode 100644
index 0000000000..ca6d98a5b7
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableBiMap;
+
+/**
+ * Utility class for Azkaban App Launcher.
+ */
+public class AzkabanLauncherUtils {
+ public static final String PLACEHOLDER_MAP_KEY = "placeholderMap";
+
+ /**
+ * Reverts properties that were converted to placeholders back to their
original values.
+ * It checks if the properties contain placeholderMap key and, if so, uses
it as an inverse map
+ * to replace placeholder values with their original values.
+ *
+ * @param appProperties the properties object containing the application
properties alongwith the inverse map
+ * @return a new Properties object with placeholders reverted to their
original values, or the original properties if no placeholderMap
+ */
+ public static Properties undoPlaceholderConversion(Properties appProperties)
{
+ if
(StringUtils.EMPTY.equals(appProperties.getProperty(PLACEHOLDER_MAP_KEY,
StringUtils.EMPTY))) {
+ return appProperties;
+ }
+
+ Properties convertedProperties = new Properties();
+ convertedProperties.putAll(appProperties);
+
+ // Undo properties converted to placeholders
+ Map<String, String> inversePlaceholderMap =
ImmutableBiMap.copyOf(Splitter.on(",").withKeyValueSeparator(":")
+
.split(convertedProperties.get(PLACEHOLDER_MAP_KEY).toString())).inverse();
+ for (Map.Entry<Object, Object> entry : convertedProperties.entrySet()) {
+ if (inversePlaceholderMap.containsKey(entry.getValue().toString())) {
+ convertedProperties.put(entry.getKey(),
inversePlaceholderMap.get(entry.getValue().toString()));
+ }
+ }
+ return convertedProperties;
+ }
+}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java
new file mode 100644
index 0000000000..8aed8a11cd
--- /dev/null
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin;
+
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.util.AzkabanLauncherUtils;
+
+
+@Test
+public class AzkabanLauncherUtilsTest {
+
+ @Test
+ public void testPropertyPlaceholderReplacement() {
+ Properties props = new Properties();
+
+ props.setProperty("placeholderMap", ":emptyStringPlaceholder,
:spacePlaceholder,\\t:tabPlaceholder");
+ props.setProperty("key1", "emptyStringPlaceholder");
+ props.setProperty("key2", "spacePlaceholder");
+ props.setProperty("key3", "tabPlaceholder");
+ props.setProperty("key4", "someOtherValue");
+ props.setProperty("key5", "123emptyStringPlaceholder");
+
+ props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
+ Assert.assertEquals(props.get("key1").toString(), "");
+ Assert.assertEquals(props.get("key2").toString(), " ");
+ Assert.assertEquals(props.get("key3").toString(), "\\t");
+ Assert.assertEquals(props.get("key4").toString(), "someOtherValue");
+
+ // should replace exact matches only
+ Assert.assertEquals(props.get("key5").toString(),
"123emptyStringPlaceholder");
+ }
+
+ @Test
+ public void testPlaceholderMapMissing() {
+ Properties props = new Properties();
+ props.setProperty("key1", "emptyStringPlaceholder");
+
+ props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
+ Assert.assertEquals(props.get("key1").toString(),
"emptyStringPlaceholder");
+ }
+
+ @Test
+ public void testEmptyPlaceholderMap() {
+ Properties props = new Properties();
+ props.setProperty("placeholderMap", "");
+ props.setProperty("key1", "emptyStringPlaceholder");
+
+ props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
+ Assert.assertEquals(props.get("key1").toString(),
"emptyStringPlaceholder");
+ }
+}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 01655e9fab..031e295aa0 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.mail.EmailException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -209,6 +210,7 @@ public class GobblinYarnAppLauncher {
private final long appReportIntervalMinutes;
private final Optional<String> appMasterJvmArgs;
+ private final String proxyJvmArgs;
private final String sinkLogRootDir;
@@ -280,6 +282,9 @@ public class GobblinYarnAppLauncher {
Optional.of(config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JVM_ARGS_KEY))
:
Optional.<String>absent();
+ this.proxyJvmArgs =
config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
+
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS)
: StringUtils.EMPTY;
+
this.sinkLogRootDir = ConfigUtils.getString(config,
GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY, null);
this.maxGetApplicationReportFailures =
config.getInt(GobblinYarnConfigurationKeys.MAX_GET_APP_REPORT_FAILURES_KEY);
@@ -829,6 +834,7 @@ public class GobblinYarnAppLauncher {
.append("
-D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY).append("=").append(config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY))
.append("
-D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST).append("=").append(String.join(",",
this.libJarNames))
.append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs))
+ .append(" ").append(this.proxyJvmArgs)
.append(" ").append(appMasterClass.getName())
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index b8fc95b29d..10bc9f9709 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -57,6 +57,12 @@ public class GobblinYarnConfigurationKeys {
public static final String YARN_APPLICATION_LIB_JAR_LIST =
GOBBLIN_YARN_PREFIX + "lib.jar.list";
+ /**
+ * Config for setting http/https proxy host and port.
+ * The value for this property is expected in format -DproxySet=true
-Dhttps.proxyHost=[hostname] -Dhttps.proxyPort=[port]
+ */
+ public static final String YARN_APPLICATION_PROXY_JVM_ARGS =
GOBBLIN_YARN_PREFIX + "proxy.jvm.args";
+
// Used to store the start time of the app launcher to propagate to workers
and appmaster
public static final String YARN_APPLICATION_LAUNCHER_START_TIME_KEY =
GOBBLIN_YARN_PREFIX + "application.start.time";
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index e1f2443448..622c6d5647 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -215,7 +215,7 @@ public class GobblinYarnAppLauncherTest implements
HelixMessageTestBase {
.resolve();
}
- @Test
+ @Test(dependsOnMethods = "testCreateHelixCluster")
public void testBuildApplicationMasterCommand() {
String command =
this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456",
64);
@@ -355,6 +355,16 @@ public class GobblinYarnAppLauncherTest implements
HelixMessageTestBase {
assertWithBackoff.assertEquals(getInstanceMessageNum, 0, "all controller
messages processed");
}
+ @Test(dependsOnMethods = "testCreateHelixCluster")
+ public void testProxyJvmArgs() throws IOException {
+ this.config =
this.config.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS,
+ ConfigValueFactory.fromAnyRef("-Dhttp.proxyHost=foo-bar.baz.org
-Dhttp.proxyPort=1234"));
+ this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config,
clusterConf);
+
+ String command =
this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456",
64);
+ Assert.assertTrue(command.contains("-Dhttp.proxyHost=foo-bar.baz.org
-Dhttp.proxyPort=1234"));
+ }
+
@AfterClass
public void tearDown() throws IOException, TimeoutException {
try {