This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 66e201c [GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu… 66e201c is described below commit 66e201ceefad1b97fcad83b50f2954e48ef2d0f4 Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Fri Jan 24 14:37:22 2020 -0800 [GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu… Closes #2874 from sv2000/helixInstanceTags --- .../cluster/GobblinClusterConfigurationKeys.java | 3 +++ .../org/apache/gobblin/cluster/GobblinTaskRunner.java | 3 +++ .../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 12 +++++++++--- .../java/org/apache/gobblin/yarn/YarnService.java | 19 +++++++++++++------ 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index fdf1eca..28339f4 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -67,6 +67,9 @@ public class GobblinClusterConfigurationKeys { public static final String WORK_UNIT_FILE_PATH = GOBBLIN_CLUSTER_PREFIX + "work.unit.file.path"; public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name"; public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceName"; + + public static final String HELIX_INSTANCE_TAGS_OPTION_NAME = "helix_instance_tags"; + // The number of tasks that can be running concurrently in the same worker process public static final String HELIX_CLUSTER_TASK_CONCURRENCY = GOBBLIN_CLUSTER_PREFIX + "helix.taskConcurrency"; public static final int HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT = 40; diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index f3edbd1..5ada074 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; @@ -602,6 +603,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge { "Application name"); options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true, "Helix instance name"); + options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME) + .hasArg(true).required(false).desc("Helix instance tags").build()); return options; } diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java index d3bb523..6e8c72f 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java @@ -25,7 +25,6 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -40,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.base.Strings; import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -189,9 +189,15 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner { ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key())); String applicationName = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME); String helixInstanceName = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME); + String helixInstanceTags = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME); + Config config = ConfigFactory.load(); + if (!Strings.isNullOrEmpty(helixInstanceTags)) { + config = config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, ConfigValueFactory.fromAnyRef(helixInstanceTags)); + } + GobblinTaskRunner gobblinTaskRunner = - new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, ConfigFactory.load(), + new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, config, Optional.<Path>absent()); gobblinTaskRunner.start(); } catch (ParseException pe) { @@ -199,4 +205,4 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner { System.exit(1); } } -} +} \ No newline at end of file diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java index 4910a5f..d10d3ac 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java @@ -34,7 +34,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -103,6 +103,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.JvmUtils; +import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor; import org.apache.gobblin.yarn.event.ContainerReleaseRequest; import org.apache.gobblin.yarn.event.ContainerShutdownRequest; import org.apache.gobblin.yarn.event.NewContainerRequest; @@ -123,6 +124,7 @@ public class YarnService extends AbstractIdleService { private final String applicationName; private final String applicationId; private final String appViewAcl; + private final String helixInstanceTags; private final Config config; @@ -225,6 +227,7 @@ public class YarnService extends AbstractIdleService { this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED); this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES); + this.helixInstanceTags = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null); this.containerJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ? Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) : @@ -560,7 +563,7 @@ public class YarnService extends AbstractIdleService { @VisibleForTesting protected String buildContainerCommand(Container container, String helixInstanceName) { String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName(); - return new StringBuilder() + StringBuilder containerCommand = new StringBuilder() .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java") .append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) - this.jvmMemoryOverheadMbs).append("M") @@ -572,12 +575,16 @@ public class YarnService extends AbstractIdleService { .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) .append(" ").append(this.applicationName) .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME) - .append(" ").append(helixInstanceName) - .append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( + .append(" ").append(helixInstanceName); + + if (!Strings.isNullOrEmpty(this.helixInstanceTags)) { + containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME) + .append(" ").append(helixInstanceTags); + } + return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( containerProcessName).append(".").append(ApplicationConstants.STDOUT) .append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( - containerProcessName).append(".").append(ApplicationConstants.STDERR) - .toString(); + containerProcessName).append(".").append(ApplicationConstants.STDERR).toString(); } /**