This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 66e201c [GOBBLIN-1032] Provide Helix instance tags config to
GobblinYarnTaskRu…
66e201c is described below
commit 66e201ceefad1b97fcad83b50f2954e48ef2d0f4
Author: sv2000 <[email protected]>
AuthorDate: Fri Jan 24 14:37:22 2020 -0800
[GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu…
Closes #2874 from sv2000/helixInstanceTags
---
.../cluster/GobblinClusterConfigurationKeys.java | 3 +++
.../org/apache/gobblin/cluster/GobblinTaskRunner.java | 3 +++
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 12 +++++++++---
.../java/org/apache/gobblin/yarn/YarnService.java | 19 +++++++++++++------
4 files changed, 28 insertions(+), 9 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index fdf1eca..28339f4 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -67,6 +67,9 @@ public class GobblinClusterConfigurationKeys {
public static final String WORK_UNIT_FILE_PATH = GOBBLIN_CLUSTER_PREFIX +
"work.unit.file.path";
public static final String HELIX_INSTANCE_NAME_OPTION_NAME =
"helix_instance_name";
public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_CLUSTER_PREFIX
+ "helixInstanceName";
+
+ public static final String HELIX_INSTANCE_TAGS_OPTION_NAME =
"helix_instance_tags";
+
// The number of tasks that can be running concurrently in the same worker
process
public static final String HELIX_CLUSTER_TASK_CONCURRENCY =
GOBBLIN_CLUSTER_PREFIX + "helix.taskConcurrency";
public static final int HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT = 40;
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index f3edbd1..5ada074 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
@@ -602,6 +603,8 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
"Application name");
options.addOption("i",
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
"Helix instance name");
+
options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+ .hasArg(true).required(false).desc("Helix instance tags").build());
return options;
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index d3bb523..6e8c72f 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,7 +25,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -40,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -189,9 +189,15 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
String applicationName =
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
String helixInstanceName =
cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
+ String helixInstanceTags =
cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME);
+ Config config = ConfigFactory.load();
+ if (!Strings.isNullOrEmpty(helixInstanceTags)) {
+ config =
config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
ConfigValueFactory.fromAnyRef(helixInstanceTags));
+ }
+
GobblinTaskRunner gobblinTaskRunner =
- new GobblinYarnTaskRunner(applicationName, helixInstanceName,
containerId, ConfigFactory.load(),
+ new GobblinYarnTaskRunner(applicationName, helixInstanceName,
containerId, config,
Optional.<Path>absent());
gobblinTaskRunner.start();
} catch (ParseException pe) {
@@ -199,4 +205,4 @@ public class GobblinYarnTaskRunner extends
GobblinTaskRunner {
System.exit(1);
}
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 4910a5f..d10d3ac 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -103,6 +103,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
import org.apache.gobblin.yarn.event.NewContainerRequest;
@@ -123,6 +124,7 @@ public class YarnService extends AbstractIdleService {
private final String applicationName;
private final String applicationId;
private final String appViewAcl;
+ private final String helixInstanceTags;
private final Config config;
@@ -225,6 +227,7 @@ public class YarnService extends AbstractIdleService {
this.containerHostAffinityEnabled =
config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
this.helixInstanceMaxRetries =
config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
+ this.helixInstanceTags = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null);
this.containerJvmArgs =
config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
:
@@ -560,7 +563,7 @@ public class YarnService extends AbstractIdleService {
@VisibleForTesting
protected String buildContainerCommand(Container container, String
helixInstanceName) {
String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
- return new StringBuilder()
+ StringBuilder containerCommand = new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
.append(" -Xmx").append((int) (container.getResource().getMemory() *
this.jvmMemoryXmxRatio) -
this.jvmMemoryOverheadMbs).append("M")
@@ -572,12 +575,16 @@ public class YarnService extends AbstractIdleService {
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
.append("
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
- .append(" ").append(helixInstanceName)
- .append("
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
+ .append(" ").append(helixInstanceName);
+
+ if (!Strings.isNullOrEmpty(this.helixInstanceTags)) {
+ containerCommand.append("
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+ .append(" ").append(helixInstanceTags);
+ }
+ return containerCommand.append("
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append("
2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
- containerProcessName).append(".").append(ApplicationConstants.STDERR)
- .toString();
+
containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
}
/**