This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9aa9e6e [GOBBLIN-875][GOBBLIN-685] Emit container health metrics when
running in cluster mode
9aa9e6e is described below
commit 9aa9e6e67ab4479cc521bb95577a85c86727e533
Author: sv2000 <[email protected]>
AuthorDate: Mon Sep 16 15:05:17 2019 -0700
[GOBBLIN-875][GOBBLIN-685] Emit container health metrics when running in
cluster mode
Closes #2729 from sv2000/metrics
---
.../apache/gobblin/aws/GobblinAWSTaskRunner.java | 2 +-
.../gobblin/cluster/ContainerHealthMetrics.java | 34 ++++++
.../cluster/ContainerHealthMetricsService.java | 134 +++++++++++++++++++++
.../cluster/GobblinClusterConfigurationKeys.java | 2 +
.../gobblin/cluster/GobblinClusterManager.java | 5 +
.../apache/gobblin/cluster/GobblinTaskRunner.java | 8 +-
.../cluster/ContainerHealthMetricsServiceTest.java | 39 ++++++
.../gobblin/yarn/GobblinApplicationMaster.java | 5 +-
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 3 +
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 7 +-
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 12 +-
11 files changed, 243 insertions(+), 8 deletions(-)
diff --git
a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
index cd2045f..4bff1ff 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
@@ -84,7 +84,7 @@ public class GobblinAWSTaskRunner extends GobblinTaskRunner {
@Override
public List<Service> getServices() {
- return Collections.emptyList();
+ return super.getServices();
}
@Override
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
new file mode 100644
index 0000000..2bf187c
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+public class ContainerHealthMetrics {
+ public static final String CONTAINER_METRICS_PREFIX =
"container.health.metrics.";
+
+ public static final String PROCESS_CPU_LOAD = CONTAINER_METRICS_PREFIX +
"processCpuLoad";
+ public static final String PROCESS_CPU_TIME = CONTAINER_METRICS_PREFIX +
"processCpuTime";
+ public static final String PROCESS_HEAP_USED_SIZE = CONTAINER_METRICS_PREFIX
+ "processHeapUsedSize";
+ public static final String SYSTEM_CPU_LOAD = CONTAINER_METRICS_PREFIX +
"systemCpuLoad";
+ public static final String SYSTEM_LOAD_AVG = CONTAINER_METRICS_PREFIX +
"systemLoadAvg";
+ public static final String COMMITTED_VMEM_SIZE = CONTAINER_METRICS_PREFIX +
"committedVmemSize";
+ public static final String FREE_SWAP_SPACE_SIZE = CONTAINER_METRICS_PREFIX +
"freeSwapSpaceSize";
+ public static final String TOTAL_SWAP_SPACE_SIZE = CONTAINER_METRICS_PREFIX
+ "totalSwapSpaceSize";
+ public static final String NUM_AVAILABLE_PROCESSORS =
CONTAINER_METRICS_PREFIX + "numAvailableProcessors";
+ public static final String TOTAL_PHYSICAL_MEM_SIZE =
CONTAINER_METRICS_PREFIX + "totalPhysicalMemSize";
+ public static final String FREE_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX
+ "freePhysicalMemSize";
+
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
new file mode 100644
index 0000000..b043857
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.sun.management.OperatingSystemMXBean;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A utility class that periodically emits system level metrics that report
the health of the container.
+ * Reported metrics include CPU/Memory usage of the JVM, system load etc.
+ *
+ * <p>
+ * This class extends the {@link AbstractScheduledService} so it can be used
with a
+ * {@link com.google.common.util.concurrent.ServiceManager} that manages the
lifecycle of
+ * a {@link ContainerHealthMetricsService}.
+ * </p>
+ * TODO: Add Garbage Collection metrics.
+*/
+public class ContainerHealthMetricsService extends AbstractScheduledService {
+ //Container metrics service configurations
+ private static final String
CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS =
"container.health.metrics.service.reportingIntervalSeconds";
+ private static final Long DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL = 30L;
+
+ private final long metricReportingInterval;
+ private final OperatingSystemMXBean operatingSystemMXBean;
+ private final MemoryMXBean memoryMXBean;
+
+ AtomicDouble processCpuLoad = new AtomicDouble(0);
+ AtomicDouble systemCpuLoad = new AtomicDouble(0);
+ AtomicDouble systemLoadAvg = new AtomicDouble(0);
+ AtomicLong committedVmemSize = new AtomicLong(0);
+ AtomicLong processCpuTime = new AtomicLong(0);
+ AtomicLong freeSwapSpaceSize = new AtomicLong(0);
+ AtomicLong numAvailableProcessors = new AtomicLong(0);
+ AtomicLong totalPhysicalMemSize = new AtomicLong(0);
+ AtomicLong totalSwapSpaceSize = new AtomicLong(0);
+ AtomicLong freePhysicalMemSize = new AtomicLong(0);
+ AtomicLong processHeapUsedSize = new AtomicLong(0);
+
+ public ContainerHealthMetricsService(Config config) {
+ this.metricReportingInterval = ConfigUtils.getLong(config,
CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS,
DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL);
+ this.operatingSystemMXBean =
ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
+ this.memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+ //Build all the gauges and register them with the metrics registry.
+ List<ContextAwareGauge<Double>> systemMetrics = buildGaugeList();
+ systemMetrics.forEach(metric -> RootMetricContext.get().register(metric));
+ }
+
+ /**
+ * Run one iteration of the scheduled task. If any invocation of this method
throws an exception,
+ * the service will transition to the {@link
com.google.common.util.concurrent.Service.State#FAILED} state and this method
will no
+ * longer be called.
+ */
+ @Override
+ protected void runOneIteration() throws Exception {
+ this.processCpuLoad.set(this.operatingSystemMXBean.getProcessCpuLoad());
+ this.systemCpuLoad.set(this.operatingSystemMXBean.getSystemCpuLoad());
+ this.systemLoadAvg.set(this.operatingSystemMXBean.getSystemLoadAverage());
+
this.committedVmemSize.set(this.operatingSystemMXBean.getCommittedVirtualMemorySize());
+ this.processCpuTime.set(this.operatingSystemMXBean.getProcessCpuTime());
+
this.freeSwapSpaceSize.set(this.operatingSystemMXBean.getFreeSwapSpaceSize());
+
this.numAvailableProcessors.set(this.operatingSystemMXBean.getAvailableProcessors());
+
this.totalPhysicalMemSize.set(this.operatingSystemMXBean.getTotalPhysicalMemorySize());
+
this.totalSwapSpaceSize.set(this.operatingSystemMXBean.getTotalSwapSpaceSize());
+
this.freePhysicalMemSize.set(this.operatingSystemMXBean.getFreePhysicalMemorySize());
+
this.processHeapUsedSize.set(this.memoryMXBean.getHeapMemoryUsage().getUsed());
+ }
+
+ protected List<ContextAwareGauge<Double>> buildGaugeList() {
+ List<ContextAwareGauge<Double>> gaugeList = new ArrayList<>();
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD,
+ () -> this.processCpuLoad.get()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD,
+ () -> this.systemCpuLoad.get()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG,
+ () -> this.systemLoadAvg.get()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE,
+ () -> Long.valueOf(this.committedVmemSize.get()).doubleValue()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_TIME,
+ () -> Long.valueOf(this.processCpuTime.get()).doubleValue()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE,
+ () -> Long.valueOf(this.freeSwapSpaceSize.get()).doubleValue()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS,
+ () -> Long.valueOf(this.numAvailableProcessors.get()).doubleValue()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE,
+ () -> Long.valueOf(this.totalPhysicalMemSize.get()).doubleValue()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE,
+ () -> Long.valueOf(this.totalSwapSpaceSize.get()).doubleValue()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE,
+ () -> Long.valueOf(this.freePhysicalMemSize.get()).doubleValue()));
+
gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE,
+ () -> Long.valueOf(this.processHeapUsedSize.get()).doubleValue()));
+ return gaugeList;
+ }
+
+ /**
+ * Returns the {@link Scheduler} object used to configure this service.
This method will only be
+ * called once.
+ */
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedRateSchedule(0, this.metricReportingInterval,
TimeUnit.SECONDS);
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 15c197f..4b21f4e 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -167,4 +167,6 @@ public class GobblinClusterConfigurationKeys {
public static final String HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS =
GOBBLIN_CLUSTER_PREFIX + "job.stoppingStateTimeoutSeconds";
public static final long DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS =
300;
+ public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED =
GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ;
+ public static final boolean DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED
= false;
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index c241c9b..2826720 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -191,6 +191,11 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
this.applicationLauncher.addService(this.jobScheduler);
this.jobConfigurationManager = buildJobConfigurationManager(config);
this.applicationLauncher.addService(this.jobConfigurationManager);
+
+ if (ConfigUtils.getBoolean(this.config,
GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+
GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED))
{
+ this.applicationLauncher.addService(new
ContainerHealthMetricsService(config));
+ }
}
/**
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index fd17216..aad3908 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -196,6 +196,7 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
this.services.addAll(suite.getServices());
this.services.addAll(getServices());
+
if (this.services.isEmpty()) {
this.serviceManager = null;
} else {
@@ -335,7 +336,12 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
* @return a {@link List} of additional {@link Service}s to run.
*/
protected List<Service> getServices() {
- return new ArrayList<>();
+ List<Service> serviceList = new ArrayList<>();
+ if (ConfigUtils.getBoolean(this.config,
GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+
GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED))
{
+ serviceList.add(new ContainerHealthMetricsService(config));
+ }
+ return serviceList;
}
@VisibleForTesting
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
new file mode 100644
index 0000000..ac67c01
--- /dev/null
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+public class ContainerHealthMetricsServiceTest {
+
+ @Test
+ public void testRunOneIteration() throws Exception {
+ Config config = ConfigFactory.empty();
+ ContainerHealthMetricsService service = new
ContainerHealthMetricsService(config);
+ service.runOneIteration();
+ long processCpuTime1 = service.processCpuTime.get();
+ Thread.sleep(10);
+ service.runOneIteration();
+ long processCpuTime2 = service.processCpuTime.get();
+ Assert.assertTrue( processCpuTime1 <= processCpuTime2);
+ }
+}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index acf31b1..282156d 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -47,6 +47,7 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
@@ -82,7 +83,9 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
public GobblinApplicationMaster(String applicationName, ContainerId
containerId, Config config,
YarnConfiguration yarnConfiguration) throws Exception {
super(applicationName,
containerId.getApplicationAttemptId().getApplicationId().toString(),
- GobblinClusterUtils.addDynamicConfig(config), Optional.<Path>absent());
+
GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
+ Optional.<Path>absent());
String containerLogDir =
config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 10ae50c..882b1d5 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -105,4 +105,7 @@ public class GobblinYarnConfigurationKeys {
//Constant definitions
public static final String GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE =
"log4j-yarn.properties";
public static final String JVM_USER_TIMEZONE_CONFIG = "user.timezone";
+
+ //Configuration properties relating to container mode of execution e.g.
Gobblin cluster runs on Yarn
+ public static final String CONTAINER_NUM_KEY = "container.num";
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index c90306c..60e4a76 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -41,6 +41,7 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterUtils;
@@ -53,18 +54,18 @@ import
org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
public class GobblinYarnTaskRunner extends GobblinTaskRunner {
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinTaskRunner.class);
- private final ContainerId containerId;
public GobblinYarnTaskRunner(String applicationName, String
helixInstanceName, ContainerId containerId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
super(applicationName, helixInstanceName, getApplicationId(containerId),
getTaskRunnerId(containerId),
- GobblinClusterUtils.addDynamicConfig(config), appWorkDirOptional);
- this.containerId = containerId;
+
GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+
ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
appWorkDirOptional);
}
@Override
public List<Service> getServices() {
List<Service> services = new ArrayList<>();
+ services.addAll(super.getServices());
if (this.config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) {
LOGGER.info("Adding YarnContainerSecurityManager since login is keytab
based");
services.add(new YarnContainerSecurityManager(this.config, this.fs,
this.eventBus));
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index c88d7fe..7b7f8f5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
@@ -133,4 +131,14 @@ public class YarnHelixUtils {
return environmentVariableMap;
}
+
+ /**
+ * Return the identifier of the containerId. The identifier is the substring
in the containerId representing
+ * the sequential number of the container.
+ * @param containerId e.g. "container_e94_1567552810874_2132400_01_000001"
+ * @return sequence number of the containerId e.g. "container-000001"
+ */
+ public static String getContainerNum(String containerId) {
+ return "container-" + containerId.substring(containerId.lastIndexOf("_") +
1);
+ }
}