phet commented on code in PR #3785:
URL: https://github.com/apache/gobblin/pull/3785#discussion_r1333987840


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.util.List;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalClusterManager;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnLogSource;
+import org.apache.gobblin.yarn.YarnContainerSecurityManager;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+
+/**
+ * The Yarn ApplicationMaster class for Gobblin using Temporal.
+ *
+ * <p>
+ *   This class runs the {@link YarnService} for all Yarn-related stuffs like 
ApplicationMaster registration
+ *   and un-registration and Yarn container provisioning.
+ * </p>
+ *
+ * @author Yinan Li

Review Comment:
   not accurate



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.ContainerHealthCheckException;
+import org.apache.gobblin.cluster.ContainerHealthMetricsService;
+import org.apache.gobblin.cluster.ContainerMetrics;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * The main class running in the containers managing services for running 
Gobblin
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
+ *
+ * <p>
+ *   If for some reason, the container exits or gets killed, the {@link 
GobblinClusterManager} will
+ *   be notified for the completion of the container and will start a new 
container to replace this one.
+ * </p>
+ *
+ * @author Yinan Li
+ */
+@Alpha
+public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
+  // Working directory key for applications. This config is set dynamically.
+  public static final String CLUSTER_APP_WORK_DIR = 
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";
+
+  private static final Logger logger = 
LoggerFactory.getLogger(GobblinTemporalTaskRunner.class);
+
+  static final java.nio.file.Path CLUSTER_CONF_PATH = 
Paths.get("generated-gobblin-cluster.conf");
+
+  private final Optional<ContainerMetrics> containerMetrics;
+  private final Path appWorkPath;
+  private boolean isTaskDriver;
+  @Getter
+  private volatile boolean started = false;
+  private volatile boolean stopInProgress = false;
+  private volatile boolean isStopped = false;
+  @Getter
+  @Setter
+  private volatile boolean healthCheckFailed = false;
+
+  protected final String taskRunnerId;
+  protected final EventBus eventBus = new 
EventBus(GobblinTemporalTaskRunner.class.getSimpleName());
+  protected final Config clusterConfig;
+  @Getter
+  protected final FileSystem fs;
+  protected final String applicationName;
+  protected final String applicationId;
+  protected final int numTemporalWorkers;
+  protected final String temporalQueueName;
+  private final boolean isMetricReportingFailureFatal;
+  private final boolean isEventReportingFailureFatal;
+
+  public GobblinTemporalTaskRunner(String applicationName,
+      String applicationId,
+      String taskRunnerId,
+      Config config,
+      Optional<Path> appWorkDirOptional) throws Exception {
+    GobblinClusterUtils.setSystemProperties(config);
+
+    //Add dynamic config
+    config = GobblinClusterUtils.addDynamicConfig(config);
+
+    this.isTaskDriver = ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.TASK_DRIVER_ENABLED,false);
+    this.taskRunnerId = taskRunnerId;
+    this.applicationName = applicationName;
+    this.applicationId = applicationId;
+    Configuration conf = HadoopUtils.newConfiguration();
+    this.fs = GobblinClusterUtils.buildFileSystem(config, conf);
+    this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
+    this.clusterConfig = saveConfigToFile(config);
+
+    logger.info("Configured GobblinTaskRunner work dir to: {}", 
this.appWorkPath.toString());
+
+    this.containerMetrics = buildContainerMetrics();
+    this.numTemporalWorkers = ConfigUtils.getInt(config, 
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
+    this.temporalQueueName = ConfigUtils.getString(config, 
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
+        GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
+
+    this.isMetricReportingFailureFatal = 
ConfigUtils.getBoolean(this.clusterConfig,
+        ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+        ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
+
+    this.isEventReportingFailureFatal = 
ConfigUtils.getBoolean(this.clusterConfig,
+        ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+        ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
+
+    logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, 
taskRunnerId {}, config {}, appWorkDir {}",
+        this.isTaskDriver ? "taskDriver" : "worker",
+        applicationName,
+        applicationId,
+        taskRunnerId,
+        config,
+        appWorkDirOptional);
+  }
+
+  public TaskRunnerSuiteBase.Builder getTaskRunnerSuiteBuilder() throws 
ReflectiveOperationException {
+    String builderStr = ConfigUtils.getString(this.clusterConfig,
+        GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
+        TaskRunnerSuiteBase.Builder.class.getName());
+
+    String hostName = "";
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      logger.warn("Cannot find host name for Helix instance: {}");
+    }
+
+    TaskRunnerSuiteBase.Builder builder = 
GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
+        new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
+            .resolveClass(builderStr), this.clusterConfig);
+
+    return builder.setAppWorkPath(this.appWorkPath)
+        .setContainerMetrics(this.containerMetrics)
+        .setFileSystem(this.fs)
+        .setApplicationId(applicationId)
+        .setApplicationName(applicationName)
+        .setContainerId(taskRunnerId)
+        .setHostName(hostName);
+  }
+
+  private Path initAppWorkDir(Config config, Optional<Path> 
appWorkDirOptional) {
+    return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : 
GobblinClusterUtils
+        .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, 
this.applicationId);
+  }
+
+  private Config saveConfigToFile(Config config)
+      throws IOException {
+    Config newConf = config
+        .withValue(CLUSTER_APP_WORK_DIR, 
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
+    ConfigUtils configUtils = new ConfigUtils(new FileUtils());
+    configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
+    return newConf;
+  }
+
+  /**
+   * Start this {@link GobblinTemporalTaskRunner} instance.
+   */
+  public void start()
+      throws ContainerHealthCheckException {
+    logger.info("Calling start method in GobblinTemporalTaskRunner");
+    logger.info(String.format("Starting in container %s", this.taskRunnerId));
+
+    // Start metric reporting
+    initMetricReporter();
+
+    // Add a shutdown hook so the task scheduler gets properly shutdown
+    addShutdownHook();
+
+    try {
+      for (int i = 0; i < this.numTemporalWorkers; i++) {
+        initiateWorker();
+      }
+    }catch (Exception e) {
+      logger.info(e + " for initiate workers");
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void initiateWorker() throws Exception{
+    logger.info("Starting Temporal Worker");
+
+    String connectionUri = 
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
+    WorkflowServiceStubs service = 
TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
+
+    String namespace = ConfigUtils.getString(clusterConfig, 
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
+            
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
+    WorkflowClient client = 
TemporalWorkflowClientFactory.createClientInstance(service, namespace);
+
+    String workerClassName = ConfigUtils.getString(clusterConfig,
+        GobblinTemporalConfigurationKeys.WORKER_CLASS, 
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+    AbstractTemporalWorker worker = 
GobblinConstructorUtils.invokeLongestConstructor(
+        (Class<AbstractTemporalWorker>) Class.forName(workerClassName), 
clusterConfig, client);
+    worker.start();

Review Comment:
   not sure how important this may for proper resource management, but these 
workers do have a corresponding `shutdown()` method, I don't see called 
anywhere.
   
   if challenging to stitch in now, at least leave a TODO



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java:
##########
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.ContainerHealthCheckException;
+import org.apache.gobblin.cluster.ContainerHealthMetricsService;
+import org.apache.gobblin.cluster.ContainerMetrics;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.TaskRunnerSuiteBase;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * The main class running in the containers managing services for running 
Gobblin
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
+ *
+ * <p>
+ *   If for some reason, the container exits or gets killed, the {@link 
GobblinClusterManager} will
+ *   be notified for the completion of the container and will start a new 
container to replace this one.
+ * </p>
+ *
+ * @author Yinan Li

Review Comment:
   also



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -0,0 +1,948 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnEventConstants;
+import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * This class is responsible for all Yarn-related stuffs including 
ApplicationMaster registration,
+ * ApplicationMaster un-registration, Yarn container management, etc.
+ *
+ * NOTE: This is a stripped down version of {@link 
org.apache.gobblin.yarn.YarnService} that is used for temporal testing
+ * without any dependency on Helix. There are some references to helix 
concepts, but they are left in for the sake of
+ * keeping some features in-tact. They don't have an actual dependency on 
helix anymore.
+ *
+ * @author Yinan Li

Review Comment:
   here too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to