This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 96e7b1620 [GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS 
Observability Event in MR alternative for distcp (#3843)
96e7b1620 is described below

commit 96e7b162074df9b01207aa5fd0c864af72974c0c
Author: Matthew Ho <[email protected]>
AuthorDate: Wed Jan 31 14:03:26 2024 -0500

    [GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS Observability Event 
in MR alternative for distcp (#3843)
    
    [GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS Observability Event 
in MR deprecation for distcp
---
 .../cluster/HelixRetriggeringJobCallable.java      |   3 +-
 .../apache/gobblin/instrumented/Instrumented.java  |   3 +-
 .../main/java/org/apache/gobblin/metrics/Tag.java  |   6 +-
 .../gobblin/metrics/event/EventSubmitter.java      |  13 ++-
 .../gobblin/metrics/event/GobblinEventBuilder.java |  12 ++-
 .../apache/gobblin/metrics/event/TimingEvent.java  |  13 ++-
 .../java/org/apache/gobblin/metrics/TagTest.java   |  21 ++++
 .../azkaban/AzkabanGobblinYarnAppLauncher.java     |   6 +-
 .../gobblin/runtime/AbstractJobLauncher.java       |   2 +-
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   |   2 +-
 .../apache/gobblin/runtime/util/JobMetrics.java    |   1 +
 .../temporal/GobblinTemporalConfigurationKeys.java |  12 +++
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |  16 ++-
 .../temporal/ddm/work/WUProcessingSpec.java        |  14 ++-
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |   3 +-
 .../ddm/workflow/ProcessWorkUnitsWorkflow.java     |   1 +
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  78 ++++++++++++++-
 .../joblauncher/GobblinTemporalJobLauncher.java    |   6 +-
 .../workflow/AbstractNestingExecWorkflowImpl.java  |   4 +-
 .../workflows/helloworld/GreetingWorkflow.java     |   8 +-
 .../workflows/helloworld/GreetingWorkflowImpl.java |  23 +++--
 .../helloworld/HelloWorldJobLauncher.java          |   6 +-
 .../workflows/helloworld/HelloWorldWorker.java     |  10 +-
 .../workflows/metrics/EventSubmitterContext.java   |  84 ++++++++++++++++
 .../temporal/workflows/metrics/EventTimer.java     |  58 +++++++++++
 .../SubmitGTEActivity.java}                        |  20 ++--
 .../SubmitGTEActivityImpl.java}                    |  24 ++---
 .../workflows/metrics/TemporalEventTimer.java      | 108 +++++++++++++++++++++
 .../apache/gobblin/temporal/yarn/YarnService.java  |  98 +++++++++----------
 .../metrics/EventSubmitterContextTest.java         |  61 ++++++++++++
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  15 ++-
 31 files changed, 618 insertions(+), 113 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 2dfb4b773..220d95a46 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -50,6 +50,7 @@ import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.JobExecutionMonitor;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -143,7 +144,7 @@ class HelixRetriggeringJobCallable implements Callable {
     this.jobsMapping = jobsMapping;
     this.locks = locks;
     this.metricContext = metricContext;
-    eventSubmitter = new EventSubmitter.Builder(this.metricContext, 
"gobblin.runtime").build();
+    eventSubmitter = new EventSubmitter.Builder(this.metricContext, 
JobMetrics.NAMESPACE).build();
   }
 
   private boolean isRetriggeringEnabled() {
diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
index 33f4b56a8..b84acb1ae 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.instrumented;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -113,7 +114,7 @@ public class Instrumented implements Instrumentable, 
Closeable {
    * @param tags Additional tags to add to the returned context.
    * @return A {@link org.apache.gobblin.metrics.MetricContext} with the 
appropriate tags and parent.
    */
-  public static MetricContext getMetricContext(State state, Class<?> klazz, 
List<Tag<?>> tags) {
+  public static MetricContext getMetricContext(State state, Class<?> klazz, 
Collection<Tag<?>> tags) {
     int randomId = RAND.nextInt(Integer.MAX_VALUE);
 
     List<Tag<?>> generatedTags = Lists.newArrayList();
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
index 3ce4c79c2..b836a5603 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
@@ -21,14 +21,15 @@ import java.util.AbstractMap;
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.base.Function;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
+import javax.annotation.Nullable;
+
 
 /**
  * A class representing a dimension or property associated with a {@link 
Taggable}.
@@ -66,6 +67,7 @@ public class Tag<T> extends AbstractMap.SimpleEntry<String, 
T> {
     super(key, value);
   }
 
+  @JsonCreator
   public Tag(Map.Entry<? extends String, ? extends T> entry) {
     super(entry);
   }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 480542560..0fc4cba0e 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -17,16 +17,18 @@
 
 package org.apache.gobblin.metrics.event;
 
+import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
+import lombok.Getter;
+
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
-
-import lombok.Getter;
+import org.apache.gobblin.metrics.Tag;
 
 
 /**
@@ -204,4 +206,11 @@ public class EventSubmitter {
   public TimingEvent getTimingEvent(String name) {
     return new TimingEvent(this, name);
   }
+
+  public List<Tag<?>> getTags() {
+    return this.metricContext.isPresent() ?
+        this.metricContext.get().getTags() :
+        null;
+  }
+
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index a3c0a402d..8a626f1d4 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.metrics.event;
 
 import java.util.Map;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
@@ -55,9 +57,17 @@ public class GobblinEventBuilder {
   }
 
   public GobblinEventBuilder(String name, String namespace) {
+    this(name, namespace, Maps.newHashMap());
+  }
+
+  @JsonCreator
+  private GobblinEventBuilder(
+      @JsonProperty("name") String name,
+      @JsonProperty("namespace") String namespace,
+      @JsonProperty("metadata") Map<String, String> metadata) {
     this.name = name;
     this.namespace = namespace;
-    metadata = Maps.newHashMap();
+    this.metadata = metadata;
   }
 
   public ImmutableMap<String, String> getMetadata() {
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index ff791221a..7a0d4cf68 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -128,7 +128,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     super(name);
     this.stopped = false;
     this.submitter = submitter;
-    this.startTime = System.currentTimeMillis();
+    this.startTime = getStartTime();
   }
 
   /**
@@ -138,6 +138,15 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     stop(Maps.<String, String>newHashMap());
   }
 
+
+  public Long getStartTime() {
+    return System.currentTimeMillis();
+  }
+
+  public Long getEndTime() {
+    return System.currentTimeMillis();
+  }
+
   /**
    * Stop the timer and submit the event, along with the additional metadata 
specified. If the timer was already stopped
    * before, this is a no-op.
@@ -162,7 +171,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     }
 
     this.stopped = true;
-    this.endTime = System.currentTimeMillis();
+    this.endTime = getEndTime();
     this.duration = this.endTime - this.startTime;
 
     this.metadata.put(EventSubmitter.EVENT_TYPE, METADATA_TIMING_EVENT);
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
index ac03e523e..eb20d5327 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
@@ -17,9 +17,16 @@
 
 package org.apache.gobblin.metrics;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 
 /**
  * Unit tests for {@link Tag}.
@@ -34,6 +41,8 @@ public class TagTest {
   private static final String PROJECT_VERSION_KEY = "project.version";
   private static final int PROJECT_VERSION = 1;
 
+  private final ObjectMapper objectMapper = new ObjectMapper();
+
   @Test
   public void testTags() {
     Tag<String> jobIdTag = new Tag<String>(JOB_ID_KEY, JOB_ID);
@@ -44,4 +53,16 @@ public class TagTest {
     Assert.assertEquals(projectVersionTag.getKey(), PROJECT_VERSION_KEY);
     Assert.assertEquals(projectVersionTag.getValue().intValue(), 
PROJECT_VERSION);
   }
+
+  @Test
+  public void testSerde() throws IOException {
+    Tag<String> jobIdTag = new Tag<String>(JOB_ID_KEY, JOB_ID);
+    Tag<Integer> projectVersionTag = new Tag<Integer>(PROJECT_VERSION_KEY, 
PROJECT_VERSION);
+    List<Tag<?>> tags = Arrays.asList(jobIdTag, projectVersionTag);
+    String bytes = objectMapper.writeValueAsString(tags);
+
+    JavaType type = 
objectMapper.getTypeFactory().constructCollectionType(List.class, Tag.class);
+    List<Tag<?>> deserTags = objectMapper.readValue(bytes, type);
+    Assert.assertEquals(deserTags, tags);
+  }
 }
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 8fe09e5a5..db279355f 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -135,10 +135,6 @@ public class AzkabanGobblinYarnAppLauncher extends 
AbstractJob {
 
   @Override
   public void cancel() throws Exception {
-    try {
-      this.gobblinYarnAppLauncher.stop();
-    } finally {
-      super.cancel();
-    }
+    this.gobblinYarnAppLauncher.stop();
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index fbd4294de..690dc113b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -1027,7 +1027,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
    * Build the {@link EventSubmitter} for this class.
    */
   private EventSubmitter buildEventSubmitter(List<? extends Tag<?>> tags) {
-    return new EventSubmitter.Builder(this.runtimeMetricContext, 
"gobblin.runtime")
+    return new EventSubmitter.Builder(this.runtimeMetricContext, 
JobMetrics.NAMESPACE)
         .addMetadata(Tag.toMap(Tag.tagValuesToString(tags))).build();
   }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 7ebb77d11..d8ce6f12a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -502,7 +502,7 @@ public class GobblinMultiTaskAttempt {
     }
 
     EventSubmitter.Builder eventSubmitterBuilder = new 
EventSubmitter.Builder(JobMetrics.get(this.jobId, new 
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
-        "gobblin.runtime");
+        JobMetrics.NAMESPACE);
     
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState,
 JobEvent.TASKS_SUBMITTED));
     eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED, 
"tasksCount", Integer.toString(tasks.size()));
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
index 03146e1df..bb8bac9b8 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
@@ -42,6 +42,7 @@ import org.apache.gobblin.util.ClustersNames;
  */
 @Slf4j
 public class JobMetrics extends GobblinMetrics {
+  public static final String NAMESPACE = "gobblin.runtime";
 
   public static final CreatorTag DEFAULT_CREATOR_TAG = new CreatorTag( 
"driver");
   protected final String jobName;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index f238ffc9b..182ce7daa 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -43,6 +43,18 @@ public interface GobblinTemporalConfigurationKeys {
 
   String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
 
+  /**
+   * Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing 
collisions with prod jobs
+   * during testing
+   *
+   */
+  String GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX = PREFIX + "job.metrics.suffix";
+  /**
+   * Default suffix for metrics emitted by GobblinTemporalJobLauncher for 
preventing collisions with prod jobs
+   * is not empty because temporal is still in alpha stages, and should not 
accidentally affect a prod job
+   */
+  String DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX = "-temporal";
+
   /**
    * Number of worker processes to spin up per task runner
    * NOTE: If this size is too large, your container can OOM and halt 
execution unexpectedly. It's recommended not to touch
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 14a4bacf4..f7040f79f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -17,20 +17,24 @@
 
 package org.apache.gobblin.temporal.ddm.launcher;
 
-import io.temporal.client.WorkflowOptions;
 import java.net.URI;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
 
 import com.typesafe.config.ConfigFactory;
-import org.apache.hadoop.fs.Path;
 
+import io.temporal.client.WorkflowOptions;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
@@ -84,6 +88,12 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
         wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
       }
       Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, 
Help.loadFileSystem(wuSpec)));
+
+      wuSpec.setTags(GobblinMetrics.getCustomTagsFromState(new 
State(jobProps)));
+      wuSpec.setMetricsSuffix(this.jobProps.getProperty(
+          GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
+          
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX));
+
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
           .setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps)))
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index 3b2597194..5e3ba3dd1 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,15 +18,23 @@
 package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
+
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
@@ -44,6 +52,8 @@ public class WUProcessingSpec implements FileSystemApt, 
FileSystemJobStateful {
   @NonNull private URI fileSystemUri;
   @NonNull private String workUnitsDir;
   @NonNull private Tuning tuning = Tuning.DEFAULT;
+  @NonNull private List<Tag<?>> tags = new ArrayList<>();
+  @NonNull private String metricsSuffix = 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
 
   @JsonIgnore // (because no-arg method resembles 'java bean property')
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index d425bbc4b..99c4d0536 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -29,6 +29,7 @@ import 
org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
 
 
 /** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
@@ -47,7 +48,7 @@ public class WorkFulfillmentWorker extends 
AbstractTemporalWorker {
 
     @Override
     protected Object[] getActivityImplInstances() {
-        return new Object[] { new ProcessWorkUnitImpl(), new 
CommitActivityImpl() };
+        return new Object[] { new ProcessWorkUnitImpl(), new 
CommitActivityImpl(), new SubmitGTEActivityImpl() };
     }
 
     @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index ba2ccf99a..eccad9bd5 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.temporal.ddm.workflow;
 
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
+
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 141f22057..531a018c7 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,7 +16,18 @@
  */
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.jetbrains.annotations.NotNull;
 
 import com.typesafe.config.ConfigFactory;
 
@@ -25,17 +36,26 @@ import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
 import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
-import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
 import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 
 
 @Slf4j
@@ -45,6 +65,32 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
 
   @Override
   public int process(WUProcessingSpec workSpec) {
+    try {
+      EventSubmitterContext eventSubmitterContext = 
getEventSubmitterContext(workSpec);
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
+      try (EventTimer timer = timerFactory.createJobTimer()) {
+        return performWork(workSpec);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @NotNull
+  private EventSubmitterContext getEventSubmitterContext(WUProcessingSpec 
workSpec)
+      throws IOException {
+    // NOTE: We are using the metrics tags from Job Props to create the metric 
context for the timer and NOT
+    // the deserialized jobState from HDFS that is created by the real distcp 
job. This is because the AZ runtime
+    // settings we want are for the job launcher that launched this Yarn job.
+    FileSystem fs = Help.loadFileSystemForce(workSpec);
+    JobState jobState = Help.loadJobStateUncached(workSpec, fs);
+    List<Tag<?>> tagsFromCurrentJob = workSpec.getTags();
+    String metricsSuffix = workSpec.getMetricsSuffix();
+    List<Tag<?>> tags = getTags(tagsFromCurrentJob, metricsSuffix, jobState);
+    return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+  }
+
+  private int performWork(WUProcessingSpec workSpec) {
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
     int workunitsProcessed = processingWorkflow.performWorkload(
@@ -85,4 +131,34 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
 
     return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
   }
+
+  private List<Tag<?>> getTags(List<Tag<?>> tagsFromCurJob, String 
metricsSuffix, JobState jobStateFromHdfs) {
+    // Construct new tags list by combining subset of tags on HDFS job state 
and the rest of the fields from the current job
+    Map<String, Tag<?>> tagsMap = new HashMap<>();
+    Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+        TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+        TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+    // Step 1, Add tags from the AZ props using the original job (the one that 
launched this yarn app)
+    tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+    // Step 2. Add tags from the jobState (the original MR job on HDFS)
+    List<String> targetKeysToAddSuffix = 
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+    GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+        .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+        .forEach(tag -> {
+          // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND 
FLOW_GROUP_FIELDS to prevent collisions when testing
+          String value = targetKeysToAddSuffix.contains(tag.getKey())
+              ? tag.getValue() + metricsSuffix
+              : String.valueOf(tag.getValue());
+          tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+        });
+
+    // Step 3: Overwrite any pre-existing metadata with name of the current 
caller
+    tagsMap.put(GobblinMetricsKeys.CLASS_META, new 
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+    return new ArrayList<>(tagsMap.values());
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index c6478b95f..f7d7255ed 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -23,10 +23,11 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
 
 import io.temporal.client.WorkflowClient;
 import io.temporal.serviceclient.WorkflowServiceStubs;
-import lombok.extern.slf4j.Slf4j;
+import io.temporal.workflow.Workflow;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
@@ -55,8 +56,9 @@ import static 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClien
  * </p>
  */
 @Alpha
-@Slf4j
 public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
+  private static final Logger log = 
Workflow.getLogger(GobblinTemporalJobLauncher.class);
+
   protected WorkflowServiceStubs workflowServiceStubs;
   protected WorkflowClient client;
   protected String queueName;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
index 0dcf19a77..6bef7a609 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -25,7 +25,6 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.utils.Lists;
 
 import io.temporal.api.enums.v1.ParentClosePolicy;
@@ -33,6 +32,7 @@ import io.temporal.workflow.Async;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Promise;
 import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
@@ -109,7 +109,7 @@ public abstract class 
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
     String thisWorkflowId = Workflow.getInfo().getWorkflowId();
     String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" + 
childAddr;
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
-        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
         .setWorkflowId(childWorkflowId)
         .build();
     return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
index 2bcf421b0..c653ce326 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
@@ -20,13 +20,19 @@ package org.apache.gobblin.temporal.workflows.helloworld;
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
 @WorkflowInterface
 public interface GreetingWorkflow {
 
     /**
      * This is the method that is executed when the Workflow Execution is 
started. The Workflow
      * Execution completes when this method finishes execution.
+     *
+     * This method also shows an example of metrics emission using the {@link 
EventSubmitter} seen in
+     * non-Temporal Gobblin code.
      */
     @WorkflowMethod
-    String getGreeting(String name);
+    String getGreeting(String name, EventSubmitterContext 
eventSubmitterContext);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 774b77e15..9d2636036 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -19,11 +19,19 @@ package org.apache.gobblin.temporal.workflows.helloworld;
 
 import java.time.Duration;
 
+import org.slf4j.Logger;
+
 import io.temporal.activity.ActivityOptions;
 import io.temporal.workflow.Workflow;
 
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
 public class GreetingWorkflowImpl implements GreetingWorkflow {
 
+    private final Logger LOG = Workflow.getLogger(GreetingWorkflowImpl.class);
+
     /*
      * At least one of the following options needs to be defined:
      * - setStartToCloseTimeout
@@ -41,16 +49,19 @@ public class GreetingWorkflowImpl implements 
GreetingWorkflow {
      *
      * The activity options that were defined above are passed in as a 
parameter.
      */
-    private final FormatActivity activity = 
Workflow.newActivityStub(FormatActivity.class, options);
+    private final FormatActivity formatActivity = 
Workflow.newActivityStub(FormatActivity.class, options);
 
     // This is the entry point to the Workflow.
     @Override
-    public String getGreeting(String name) {
-
+    public String getGreeting(String name, EventSubmitterContext 
eventSubmitterContext) {
         /**
-         * If there were other Activity methods they would be orchestrated 
here or from within other Activities.
-         * This is a blocking call that returns only after the activity has 
completed.
+         * Example of the {@link TemporalEventTimer.Factory} invoking child 
activity for instrumentation.
          */
-        return activity.composeGreeting(name);
+        TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
+        try (TemporalEventTimer timer = 
timerFactory.create("getGreetingTime")) {
+            LOG.info("Executing getGreeting");
+            timer.addMetadata("name", name);
+            return formatActivity.composeGreeting(name);
+        }
     }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
index 33183b63f..5d196fae4 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
 /**
@@ -56,6 +57,9 @@ public class HelloWorldJobLauncher extends 
GobblinTemporalJobLauncher {
   public void submitJob(List<WorkUnit> workunits) {
     WorkflowOptions options = 
WorkflowOptions.newBuilder().setTaskQueue(queueName).build();
     GreetingWorkflow greetingWorkflow = 
this.client.newWorkflowStub(GreetingWorkflow.class, options);
-    greetingWorkflow.getGreeting("Gobblin");
+    EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext(this.eventSubmitter);
+
+    String greeting = greetingWorkflow.getGreeting("Gobblin", 
eventSubmitterContext);
+    log.info(greeting);
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
index 274daee9c..d5d01cc33 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
@@ -22,6 +22,7 @@ import com.typesafe.config.Config;
 import io.temporal.client.WorkflowClient;
 
 import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
 
 
 public class HelloWorldWorker extends AbstractTemporalWorker {
@@ -31,11 +32,16 @@ public class HelloWorldWorker extends 
AbstractTemporalWorker {
 
     @Override
     protected Class<?>[] getWorkflowImplClasses() {
-        return new Class[] { GreetingWorkflowImpl.class };
+        return new Class[] {
+            GreetingWorkflowImpl.class,
+        };
     }
 
     @Override
     protected Object[] getActivityImplInstances() {
-        return new Object[] { new FormatActivityImpl() };
+        return new Object[] {
+            new FormatActivityImpl(),
+            new SubmitGTEActivityImpl()
+        };
     }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
new file mode 100644
index 000000000..f0db7e7ab
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+import static org.apache.gobblin.instrumented.GobblinMetricsKeys.CLASS_META;
+
+
+/**
+ * Wrapper for sending the core essence of an {@link EventSubmitter} over the 
wire (e.g. metadata tags, namespace)
+ * This is in lieu of sending the entire {@link EventSubmitter} object over 
the wire, which is not serializable without
+ * losing some information, such as the gauges
+ */
+@Getter
+public class EventSubmitterContext {
+  private final List<Tag<?>> tags;
+  private final String namespace;
+  private final Class callerClass;
+
+  @JsonCreator
+  private EventSubmitterContext(
+      @JsonProperty("tags") List<Tag<?>> tags,
+      @JsonProperty("namespace") String namespace,
+      @JsonProperty("callerClass") Class callerClass) {
+    this.tags = tags;
+    this.namespace = namespace;
+    this.callerClass = callerClass;
+  }
+
+  public EventSubmitterContext(List<Tag<?>> tags, String namespace) {
+    // Explicitly send class over the wire to avoid any classloader issues
+    this(tags, namespace, tags.stream()
+        .filter(tag -> tag.getKey().equals(CLASS_META))
+        .findAny()
+        .map(tag -> (String) tag.getValue())
+        .map(EventSubmitterContext::resolveClass)
+        .orElse(EventSubmitterContext.class));
+  }
+
+  public EventSubmitterContext(EventSubmitter eventSubmitter) {
+    this(eventSubmitter.getTags(), eventSubmitter.getNamespace());
+  }
+
+  public EventSubmitter create() {
+    MetricContext metricContext = Instrumented.getMetricContext(new State(), 
callerClass, tags);
+    return new EventSubmitter.Builder(metricContext, namespace).build();
+  }
+
+  private static Class resolveClass(String canonicalClassName) {
+    try {
+      return Class.forName(canonicalClassName);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
new file mode 100644
index 000000000..677baf432
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.io.Closeable;
+
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * <p> A timer that can be used to track the duration of an event. This event 
differs from the {@link TimingEvent} in that
+ * this class is not meant to be used outside of {@link 
io.temporal.workflow.Workflow} code. We cannot use {@link TimingEvent}
+ * because it is not serializable and cannot be passed to {@link 
io.temporal.workflow.Workflow} code due to the
+ * {@link EventSubmitter} field. It also relies on {@link 
System#currentTimeMillis()} which not compatible with {@link 
io.temporal.workflow.Workflow}
+ * since {@link System#currentTimeMillis()} is not deterministic.</p>
+ *
+ * <p> {@link EventSubmitter} is not easily serializable because the {@link 
MetricContext} field
+ * contains bi-directional relationships via the {@link 
org.apache.gobblin.metrics.InnerGauge}. Although it's possible
+ * to write a custom serializer for {@link EventSubmitter}, it creates a 
non-obvious sleight of hand where the EventSubmitter
+ * metadata will change when crossing {@link io.temporal.workflow.Workflow} or 
{@link io.temporal.activity.Activity} boundaries. </p>
+ *
+ * <p> It differs from {@link Closeable} because the close method does not 
throw {@link java.io.IOException}. {@link TimingEvent}
+ * does this but the issue is it does not implement an interface. Inheritance 
is not a good solution either because of the
+ * {@link EventSubmitter} member variable. </p>
+ */
+public interface EventTimer extends Closeable {
+  /**
+   * Add additional metadata that will be used for post-processing when the 
timer is stopped via {@link #stop()}
+   * @param key
+   * @param metadata
+   */
+  void addMetadata(String key, String metadata);
+
+  /**
+   * Stops the timer and execute any post-processing (e.g. event submission)
+   */
+  void stop();
+
+  default void close() {
+    stop();
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
similarity index 64%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
index 2bcf421b0..aa975d10d 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
@@ -15,18 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.temporal.workflows.helloworld;
+package org.apache.gobblin.temporal.workflows.metrics;
 
-import io.temporal.workflow.WorkflowInterface;
-import io.temporal.workflow.WorkflowMethod;
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
 
-@WorkflowInterface
-public interface GreetingWorkflow {
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 
-    /**
-     * This is the method that is executed when the Workflow Execution is 
started. The Workflow
-     * Execution completes when this method finishes execution.
-     */
-    @WorkflowMethod
-    String getGreeting(String name);
+
+@ActivityInterface
+public interface SubmitGTEActivity {
+    @ActivityMethod
+    void submitGTE(GobblinEventBuilder eventBuilder, EventSubmitterContext 
eventSubmitterContext);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
similarity index 60%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
index 2bcf421b0..63bce421a 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
@@ -15,18 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.temporal.workflows.helloworld;
+package org.apache.gobblin.temporal.workflows.metrics;
 
-import io.temporal.workflow.WorkflowInterface;
-import io.temporal.workflow.WorkflowMethod;
+import org.slf4j.Logger;
 
-@WorkflowInterface
-public interface GreetingWorkflow {
+import io.temporal.workflow.Workflow;
 
-    /**
-     * This is the method that is executed when the Workflow Execution is 
started. The Workflow
-     * Execution completes when this method finishes execution.
-     */
-    @WorkflowMethod
-    String getGreeting(String name);
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+
+public class SubmitGTEActivityImpl implements SubmitGTEActivity {
+    private static Logger log = 
Workflow.getLogger(SubmitGTEActivityImpl.class);
+
+    @Override
+    public void submitGTE(GobblinEventBuilder eventBuilder, 
EventSubmitterContext eventSubmitterContext) {
+        eventSubmitterContext.create().submit(eventBuilder);
+    }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
new file mode 100644
index 000000000..0f68f1132
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.workflow.Workflow;
+import lombok.RequiredArgsConstructor;
+
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * Boiler plate for tracking elapsed time of events that is compatible with 
{@link Workflow}
+ * by using activities to record time
+ *
+ * This class is very similar to {@link TimingEvent} but uses {@link Workflow} 
compatible APIs. It's possible to refactor
+ * this class to inherit the {@link TimingEvent} but extra care would be 
needed to remove the {@link EventSubmitter} field
+ * since that class is not serializable without losing some information
+ */
+@RequiredArgsConstructor
+public class TemporalEventTimer implements EventTimer {
+  private final SubmitGTEActivity trackingEventActivity;
+  private final GobblinEventBuilder eventBuilder;
+  private final EventSubmitterContext eventSubmitterContext;
+  private final Instant startTime;
+
+  @Override
+  public void stop() {
+    stop(getCurrentTime());
+  }
+
+  @Override
+  public void addMetadata(String key, String metadata) {
+    this.eventBuilder.addMetadata(key, metadata);
+  }
+
+
+  private void stop(Instant endTime) {
+    this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE, 
TimingEvent.METADATA_TIMING_EVENT);
+    this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME, 
Long.toString(this.startTime.toEpochMilli()));
+    this.eventBuilder.addMetadata(TimingEvent.METADATA_END_TIME, 
Long.toString(endTime.toEpochMilli()));
+    Duration duration = Duration.between(this.startTime, endTime);
+    this.eventBuilder.addMetadata(TimingEvent.METADATA_DURATION, 
Long.toString(duration.toMillis()));
+
+    trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext);
+  }
+
+  private static Instant getCurrentTime() {
+    return Instant.ofEpochMilli(Workflow.currentTimeMillis());
+  }
+
+  public static class Factory {
+    private static final ActivityOptions DEFAULT_OPTS = 
ActivityOptions.newBuilder().build();
+    private final SubmitGTEActivity submitGTEActivity;
+    private final EventSubmitterContext eventSubmitterContext;
+
+    public Factory(EventSubmitterContext eventSubmitterContext) {
+      this(eventSubmitterContext, DEFAULT_OPTS);
+    }
+
+    public Factory(EventSubmitterContext eventSubmitterContext, 
ActivityOptions opts) {
+      this.submitGTEActivity = 
Workflow.newActivityStub(SubmitGTEActivity.class, opts);
+      this.eventSubmitterContext = eventSubmitterContext;
+    }
+
+    public TemporalEventTimer create(String eventName, Instant startTime) {
+      GobblinEventBuilder eventBuilder = new GobblinEventBuilder(eventName, 
eventSubmitterContext.getNamespace());
+      return new TemporalEventTimer(submitGTEActivity, eventBuilder, 
this.eventSubmitterContext, startTime);
+    }
+
+    public TemporalEventTimer create(String eventName) {
+      return create(eventName, getCurrentTime());
+    }
+
+    /**
+     * Utility for creating a timer that emits separate events at the start 
and end of a job. This imitates the behavior in
+     * {@link org.apache.gobblin.runtime.AbstractJobLauncher} and emits events 
that are compatible with the
+     * {@link org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor} to 
update GaaS flow statuses
+     *
+     * @return a timer that emits an event at the beginning of the job and a 
completion event ends at the end of the job
+     */
+    public TemporalEventTimer createJobTimer() {
+      TemporalEventTimer startTimer = 
create(TimingEvent.LauncherTimings.JOB_START);
+      startTimer.stop(Instant.EPOCH); // Emit start job event containing a 
stub end time
+      return create(TimingEvent.LauncherTimings.JOB_COMPLETE, 
startTimer.startTime);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 829b97e1f..ab6954508 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -17,21 +17,6 @@
 
 package org.apache.gobblin.temporal.yarn;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -50,32 +35,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
-import lombok.AccessLevel;
-import lombok.Getter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
-import org.apache.gobblin.cluster.GobblinClusterUtils;
-import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
-import org.apache.gobblin.metrics.MultiReporterException;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
-import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
-import org.apache.gobblin.yarn.GobblinYarnEventConstants;
-import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
-import org.apache.gobblin.yarn.YarnHelixUtils;
-import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
-import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
-import org.apache.gobblin.yarn.event.NewContainerRequest;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -107,6 +66,49 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnEventConstants;
+import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
 
 /**
  * This class is responsible for all Yarn-related stuffs including 
ApplicationMaster registration,
@@ -683,18 +685,10 @@ class YarnService extends AbstractIdleService {
       LOGGER.info("Adding instance {} to the pool of unused instances", 
completedInstanceName);
       this.unusedHelixInstanceNames.add(completedInstanceName);
 
-      if (this.eventSubmitter.isPresent()) {
-        this.eventSubmitter.get()
-            
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, 
eventMetadataBuilder.get().build());
-      }
+      // NOTE: logic for handling container failure is removed because 
original implementation relies on the auto scaling manager
+      // to control the number of containers by polling helix for the current 
number of tasks
+      // Without that integration, that code requests too many containers when 
there are exceptions and overloads yarn
     }
-    Optional<Resource> newContainerResource = completedContainerInfo != null ?
-        Optional.of(completedContainerInfo.getContainer().getResource()) : 
Optional.absent();
-    LOGGER.info("Requesting a new container to replace {} to run Helix 
instance {} with helix tag {} and resource {}",
-        containerStatus.getContainerId(), completedInstanceName, helixTag, 
newContainerResource.orNull());
-    this.eventBus.post(new NewContainerRequest(
-        shouldStickToTheSameNode(containerStatus.getExitStatus()) && 
completedContainerInfo != null ?
-            Optional.of(completedContainerInfo.getContainer()) : 
Optional.absent(), newContainerResource));
   }
 
   private boolean handleAbortedContainer(ContainerStatus containerStatus, 
ContainerInfo completedContainerInfo,
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
new file mode 100644
index 000000000..91c960eda
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+import static org.apache.gobblin.instrumented.GobblinMetricsKeys.CLASS_META;
+
+public class EventSubmitterContextTest {
+  private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private final String NAMESPACE = "test-namespace";
+  @Test
+  public void testCreateEventSubmitter()
+      throws IOException {
+    List<Tag<?>> tags = Arrays.asList(new Tag<>("jobId", "stub"));
+    State state = new State();
+    state.setProp("someState", "stub");
+    MetricContext metricContext = Instrumented.getMetricContext(state, 
getClass(), tags);
+    EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext, 
NAMESPACE).build();
+    EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext(eventSubmitter);
+    byte[] asBytes = OBJECT_MAPPER.writeValueAsBytes(eventSubmitterContext);
+
+    EventSubmitterContext deserEventMetadata = 
OBJECT_MAPPER.readValue(asBytes, EventSubmitterContext.class);
+    EventSubmitter deserEventSubmitter = deserEventMetadata.create();
+    Assert.assertTrue(deserEventSubmitter.getTags().contains(tags.get(0)));
+    Assert.assertEquals(deserEventSubmitter.getNamespace(), NAMESPACE);
+
+    Map<? extends String, String> tagMap = 
Tag.toMap(Tag.tagValuesToString(eventSubmitter.getTags()));
+    Assert.assertEquals(tagMap.get(CLASS_META), this.getClass().getName());
+    
Assert.assertTrue(tagMap.containsKey(MetricContext.METRIC_CONTEXT_ID_TAG_NAME));
+    
Assert.assertTrue(tagMap.containsKey(MetricContext.METRIC_CONTEXT_NAME_TAG_NAME));
+  }
+}
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 391b160fd..d76405aeb 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -104,11 +104,14 @@ import org.apache.gobblin.cluster.GobblinHelixConstants;
 import org.apache.gobblin.cluster.GobblinHelixMessagingService;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 import org.apache.gobblin.rest.JobExecutionInfoServer;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.util.AzkabanTags;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.EmailUtils;
@@ -121,6 +124,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
 import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
 
+import static 
org.apache.gobblin.metrics.GobblinMetrics.METRICS_STATE_CUSTOM_TAGS;
 import static 
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
 
 
@@ -1051,8 +1055,15 @@ public class GobblinYarnAppLauncher {
       Schema schema = KafkaReporterUtils.getMetricReportSchema();
       String schemaId = registry.register(schema, 
KafkaReporterUtils.getMetricsTopic(properties).get());
       LOGGER.info("Adding schemaId {} for MetricReport to the config", 
schemaId);
-      config = 
config.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
-          ConfigValueFactory.fromAnyRef(schemaId));
+      List<Tag<?>> tags = Lists.newArrayList();
+      tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
+      GobblinMetrics.addCustomTagsToProperties(properties, tags);
+
+      config = config
+          
.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
+              ConfigValueFactory.fromAnyRef(schemaId))
+          .withValue(METRICS_STATE_CUSTOM_TAGS,
+              
ConfigValueFactory.fromAnyRef(properties.getProperty(METRICS_STATE_CUSTOM_TAGS)));
     }
     return config;
   }

Reply via email to