This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bb0c40bfb [GOBBLIN-2172] Enable launching GoT YARN AM and workers 
with proxy HTTPS host+port (#4075)
4bb0c40bfb is described below

commit 4bb0c40bfbf75133dd6b1820f0718cd22fed2f00
Author: abhishekmjain <[email protected]>
AuthorDate: Fri Nov 15 01:30:36 2024 +0530

    [GOBBLIN-2172] Enable launching GoT YARN AM and workers with proxy HTTPS 
host+port (#4075)
---
 .../azkaban/AzkabanGobblinYarnAppLauncher.java     |  3 +-
 .../apache/gobblin/temporal/yarn/YarnService.java  |  6 ++
 .../apache/gobblin/util/AzkabanLauncherUtils.java  | 59 ++++++++++++++++++
 .../apache/gobblin/AzkabanLauncherUtilsTest.java   | 70 ++++++++++++++++++++++
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  6 ++
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |  6 ++
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   | 12 +++-
 7 files changed, 160 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index db279355fa..be3ad5c8f4 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -32,6 +32,7 @@ import com.typesafe.config.ConfigValueFactory;
 import azkaban.jobExecutor.AbstractJob;
 import lombok.Getter;
 
+import org.apache.gobblin.util.AzkabanLauncherUtils;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
 import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
@@ -63,7 +64,7 @@ public class AzkabanGobblinYarnAppLauncher extends 
AbstractJob {
   public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps)
       throws IOException {
     super(jobId, LOGGER);
-
+    gobblinProps = 
AzkabanLauncherUtils.undoPlaceholderConversion(gobblinProps);
     addRuntimeProperties(gobblinProps);
 
     Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 47d76b963e..c8fbd047c5 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -157,6 +158,7 @@ class YarnService extends AbstractIdleService {
 
   private final Optional<String> containerJvmArgs;
   private final String containerTimezone;
+  private final String proxyJvmArgs;
 
   @Getter(AccessLevel.PROTECTED)
   private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
@@ -240,6 +242,9 @@ class YarnService extends AbstractIdleService {
         
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
 :
         Optional.<String>absent();
 
+    this.proxyJvmArgs = 
config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
+        
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) 
: StringUtils.EMPTY;
+
     int numContainerLaunchThreads =
         ConfigUtils.getInt(config, 
GobblinYarnConfigurationKeys.MAX_CONTAINER_LAUNCH_THREADS_KEY,
             GobblinYarnConfigurationKeys.DEFAULT_MAX_CONTAINER_LAUNCH_THREADS);
@@ -594,6 +599,7 @@ class YarnService extends AbstractIdleService {
         .append(" 
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
         .append(" 
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
         .append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))
+        .append(" ").append(this.proxyJvmArgs)
         .append(" ").append(GobblinTemporalYarnTaskRunner.class.getName())
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
         .append(" ").append(this.applicationName)
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java
new file mode 100644
index 0000000000..ca6d98a5b7
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanLauncherUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableBiMap;
+
+/**
+ * Utility class for Azkaban App Launcher.
+ */
+public class AzkabanLauncherUtils {
+  public static final String PLACEHOLDER_MAP_KEY = "placeholderMap";
+
+  /**
+   * Reverts properties that were converted to placeholders back to their 
original values.
+   * It checks if the properties contain placeholderMap key and, if so, uses 
it as an inverse map
+   * to replace placeholder values with their original values.
+   *
+   * @param appProperties the properties object containing the application 
properties alongwith the inverse map
+   * @return a new Properties object with placeholders reverted to their 
original values, or the original properties if no placeholderMap
+   */
+  public static Properties undoPlaceholderConversion(Properties appProperties) 
{
+    if 
(StringUtils.EMPTY.equals(appProperties.getProperty(PLACEHOLDER_MAP_KEY, 
StringUtils.EMPTY))) {
+      return appProperties;
+    }
+
+    Properties convertedProperties = new Properties();
+    convertedProperties.putAll(appProperties);
+
+    // Undo properties converted to placeholders
+    Map<String, String> inversePlaceholderMap = 
ImmutableBiMap.copyOf(Splitter.on(",").withKeyValueSeparator(":")
+        
.split(convertedProperties.get(PLACEHOLDER_MAP_KEY).toString())).inverse();
+    for (Map.Entry<Object, Object> entry : convertedProperties.entrySet()) {
+      if (inversePlaceholderMap.containsKey(entry.getValue().toString())) {
+        convertedProperties.put(entry.getKey(), 
inversePlaceholderMap.get(entry.getValue().toString()));
+      }
+    }
+    return convertedProperties;
+  }
+}
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java
new file mode 100644
index 0000000000..8aed8a11cd
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/AzkabanLauncherUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin;
+
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.util.AzkabanLauncherUtils;
+
+
+@Test
+public class AzkabanLauncherUtilsTest {
+
+  @Test
+  public void testPropertyPlaceholderReplacement() {
+    Properties props = new Properties();
+
+    props.setProperty("placeholderMap", ":emptyStringPlaceholder, 
:spacePlaceholder,\\t:tabPlaceholder");
+    props.setProperty("key1", "emptyStringPlaceholder");
+    props.setProperty("key2", "spacePlaceholder");
+    props.setProperty("key3", "tabPlaceholder");
+    props.setProperty("key4", "someOtherValue");
+    props.setProperty("key5", "123emptyStringPlaceholder");
+
+    props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
+    Assert.assertEquals(props.get("key1").toString(), "");
+    Assert.assertEquals(props.get("key2").toString(), " ");
+    Assert.assertEquals(props.get("key3").toString(), "\\t");
+    Assert.assertEquals(props.get("key4").toString(), "someOtherValue");
+
+    // should replace exact matches only
+    Assert.assertEquals(props.get("key5").toString(), 
"123emptyStringPlaceholder");
+  }
+
+  @Test
+  public void testPlaceholderMapMissing() {
+    Properties props = new Properties();
+    props.setProperty("key1", "emptyStringPlaceholder");
+
+    props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
+    Assert.assertEquals(props.get("key1").toString(), 
"emptyStringPlaceholder");
+  }
+
+  @Test
+  public void testEmptyPlaceholderMap() {
+    Properties props = new Properties();
+    props.setProperty("placeholderMap", "");
+    props.setProperty("key1", "emptyStringPlaceholder");
+
+    props = AzkabanLauncherUtils.undoPlaceholderConversion(props);
+    Assert.assertEquals(props.get("key1").toString(), 
"emptyStringPlaceholder");
+  }
+}
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 01655e9fab..031e295aa0 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.avro.Schema;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.mail.EmailException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -209,6 +210,7 @@ public class GobblinYarnAppLauncher {
   private final long appReportIntervalMinutes;
 
   private final Optional<String> appMasterJvmArgs;
+  private final String proxyJvmArgs;
 
   private final String sinkLogRootDir;
 
@@ -280,6 +282,9 @@ public class GobblinYarnAppLauncher {
         
Optional.of(config.getString(GobblinYarnConfigurationKeys.APP_MASTER_JVM_ARGS_KEY))
 :
         Optional.<String>absent();
 
+    this.proxyJvmArgs = 
config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
+        
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) 
: StringUtils.EMPTY;
+
     this.sinkLogRootDir = ConfigUtils.getString(config, 
GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY, null);
 
     this.maxGetApplicationReportFailures = 
config.getInt(GobblinYarnConfigurationKeys.MAX_GET_APP_REPORT_FAILURES_KEY);
@@ -829,6 +834,7 @@ public class GobblinYarnAppLauncher {
         .append(" 
-D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY).append("=").append(config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY))
         .append(" 
-D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST).append("=").append(String.join(",",
 this.libJarNames))
         .append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs))
+        .append(" ").append(this.proxyJvmArgs)
         .append(" ").append(appMasterClass.getName())
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
         .append(" ").append(this.applicationName)
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index b8fc95b29d..10bc9f9709 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -57,6 +57,12 @@ public class GobblinYarnConfigurationKeys {
 
   public static final String YARN_APPLICATION_LIB_JAR_LIST = 
GOBBLIN_YARN_PREFIX + "lib.jar.list";
 
+  /**
+   * Config for setting http/https proxy host and port.
+   * The value for this property is expected in format -DproxySet=true 
-Dhttps.proxyHost=[hostname] -Dhttps.proxyPort=[port]
+   */
+  public static final String YARN_APPLICATION_PROXY_JVM_ARGS = 
GOBBLIN_YARN_PREFIX + "proxy.jvm.args";
+
   // Used to store the start time of the app launcher to propagate to workers 
and appmaster
   public static final String YARN_APPLICATION_LAUNCHER_START_TIME_KEY = 
GOBBLIN_YARN_PREFIX + "application.start.time";
 
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index e1f2443448..622c6d5647 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -215,7 +215,7 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
         .resolve();
   }
 
-  @Test
+  @Test(dependsOnMethods = "testCreateHelixCluster")
   public void testBuildApplicationMasterCommand() {
     String command = 
this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456",
 64);
 
@@ -355,6 +355,16 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
     assertWithBackoff.assertEquals(getInstanceMessageNum, 0, "all controller 
messages processed");
   }
 
+  @Test(dependsOnMethods = "testCreateHelixCluster")
+  public void testProxyJvmArgs() throws IOException {
+    this.config = 
this.config.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS,
+        ConfigValueFactory.fromAnyRef("-Dhttp.proxyHost=foo-bar.baz.org 
-Dhttp.proxyPort=1234"));
+    this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, 
clusterConf);
+
+    String command = 
this.gobblinYarnAppLauncher.buildApplicationMasterCommand("application_1234_3456",
 64);
+    Assert.assertTrue(command.contains("-Dhttp.proxyHost=foo-bar.baz.org 
-Dhttp.proxyPort=1234"));
+  }
+
   @AfterClass
   public void tearDown() throws IOException, TimeoutException {
     try {

Reply via email to