This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 96e7b1620 [GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS
Observability Event in MR alternative for distcp (#3843)
96e7b1620 is described below
commit 96e7b162074df9b01207aa5fd0c864af72974c0c
Author: Matthew Ho <[email protected]>
AuthorDate: Wed Jan 31 14:03:26 2024 -0500
[GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS Observability Event
in MR alternative for distcp (#3843)
[GOBBLIN-1989] TemporaI: Implementation of GTE for GaaS Observability Event
in MR deprecation for distcp
---
.../cluster/HelixRetriggeringJobCallable.java | 3 +-
.../apache/gobblin/instrumented/Instrumented.java | 3 +-
.../main/java/org/apache/gobblin/metrics/Tag.java | 6 +-
.../gobblin/metrics/event/EventSubmitter.java | 13 ++-
.../gobblin/metrics/event/GobblinEventBuilder.java | 12 ++-
.../apache/gobblin/metrics/event/TimingEvent.java | 13 ++-
.../java/org/apache/gobblin/metrics/TagTest.java | 21 ++++
.../azkaban/AzkabanGobblinYarnAppLauncher.java | 6 +-
.../gobblin/runtime/AbstractJobLauncher.java | 2 +-
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 2 +-
.../apache/gobblin/runtime/util/JobMetrics.java | 1 +
.../temporal/GobblinTemporalConfigurationKeys.java | 12 +++
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 16 ++-
.../temporal/ddm/work/WUProcessingSpec.java | 14 ++-
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 3 +-
.../ddm/workflow/ProcessWorkUnitsWorkflow.java | 1 +
.../impl/ProcessWorkUnitsWorkflowImpl.java | 78 ++++++++++++++-
.../joblauncher/GobblinTemporalJobLauncher.java | 6 +-
.../workflow/AbstractNestingExecWorkflowImpl.java | 4 +-
.../workflows/helloworld/GreetingWorkflow.java | 8 +-
.../workflows/helloworld/GreetingWorkflowImpl.java | 23 +++--
.../helloworld/HelloWorldJobLauncher.java | 6 +-
.../workflows/helloworld/HelloWorldWorker.java | 10 +-
.../workflows/metrics/EventSubmitterContext.java | 84 ++++++++++++++++
.../temporal/workflows/metrics/EventTimer.java | 58 +++++++++++
.../SubmitGTEActivity.java} | 20 ++--
.../SubmitGTEActivityImpl.java} | 24 ++---
.../workflows/metrics/TemporalEventTimer.java | 108 +++++++++++++++++++++
.../apache/gobblin/temporal/yarn/YarnService.java | 98 +++++++++----------
.../metrics/EventSubmitterContextTest.java | 61 ++++++++++++
.../gobblin/yarn/GobblinYarnAppLauncher.java | 15 ++-
31 files changed, 618 insertions(+), 113 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 2dfb4b773..220d95a46 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -50,6 +50,7 @@ import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.JobExecutionMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -143,7 +144,7 @@ class HelixRetriggeringJobCallable implements Callable {
this.jobsMapping = jobsMapping;
this.locks = locks;
this.metricContext = metricContext;
- eventSubmitter = new EventSubmitter.Builder(this.metricContext,
"gobblin.runtime").build();
+ eventSubmitter = new EventSubmitter.Builder(this.metricContext,
JobMetrics.NAMESPACE).build();
}
private boolean isRetriggeringEnabled() {
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
index 33f4b56a8..b84acb1ae 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.instrumented;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -113,7 +114,7 @@ public class Instrumented implements Instrumentable,
Closeable {
* @param tags Additional tags to add to the returned context.
* @return A {@link org.apache.gobblin.metrics.MetricContext} with the
appropriate tags and parent.
*/
- public static MetricContext getMetricContext(State state, Class<?> klazz,
List<Tag<?>> tags) {
+ public static MetricContext getMetricContext(State state, Class<?> klazz,
Collection<Tag<?>> tags) {
int randomId = RAND.nextInt(Integer.MAX_VALUE);
List<Tag<?>> generatedTags = Lists.newArrayList();
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
index 3ce4c79c2..b836a5603 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/Tag.java
@@ -21,14 +21,15 @@ import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+
/**
* A class representing a dimension or property associated with a {@link
Taggable}.
@@ -66,6 +67,7 @@ public class Tag<T> extends AbstractMap.SimpleEntry<String,
T> {
super(key, value);
}
+ @JsonCreator
public Tag(Map.Entry<? extends String, ? extends T> entry) {
super(entry);
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 480542560..0fc4cba0e 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -17,16 +17,18 @@
package org.apache.gobblin.metrics.event;
+import java.util.List;
import java.util.Map;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import lombok.Getter;
+
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
-
-import lombok.Getter;
+import org.apache.gobblin.metrics.Tag;
/**
@@ -204,4 +206,11 @@ public class EventSubmitter {
public TimingEvent getTimingEvent(String name) {
return new TimingEvent(this, name);
}
+
+ public List<Tag<?>> getTags() {
+ return this.metricContext.isPresent() ?
+ this.metricContext.get().getTags() :
+ null;
+ }
+
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index a3c0a402d..8a626f1d4 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.metrics.event;
import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -55,9 +57,17 @@ public class GobblinEventBuilder {
}
public GobblinEventBuilder(String name, String namespace) {
+ this(name, namespace, Maps.newHashMap());
+ }
+
+ @JsonCreator
+ private GobblinEventBuilder(
+ @JsonProperty("name") String name,
+ @JsonProperty("namespace") String namespace,
+ @JsonProperty("metadata") Map<String, String> metadata) {
this.name = name;
this.namespace = namespace;
- metadata = Maps.newHashMap();
+ this.metadata = metadata;
}
public ImmutableMap<String, String> getMetadata() {
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index ff791221a..7a0d4cf68 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -128,7 +128,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
super(name);
this.stopped = false;
this.submitter = submitter;
- this.startTime = System.currentTimeMillis();
+ this.startTime = getStartTime();
}
/**
@@ -138,6 +138,15 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
stop(Maps.<String, String>newHashMap());
}
+
+ public Long getStartTime() {
+ return System.currentTimeMillis();
+ }
+
+ public Long getEndTime() {
+ return System.currentTimeMillis();
+ }
+
/**
* Stop the timer and submit the event, along with the additional metadata
specified. If the timer was already stopped
* before, this is a no-op.
@@ -162,7 +171,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
}
this.stopped = true;
- this.endTime = System.currentTimeMillis();
+ this.endTime = getEndTime();
this.duration = this.endTime - this.startTime;
this.metadata.put(EventSubmitter.EVENT_TYPE, METADATA_TIMING_EVENT);
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
index ac03e523e..eb20d5327 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TagTest.java
@@ -17,9 +17,16 @@
package org.apache.gobblin.metrics;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
/**
* Unit tests for {@link Tag}.
@@ -34,6 +41,8 @@ public class TagTest {
private static final String PROJECT_VERSION_KEY = "project.version";
private static final int PROJECT_VERSION = 1;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
@Test
public void testTags() {
Tag<String> jobIdTag = new Tag<String>(JOB_ID_KEY, JOB_ID);
@@ -44,4 +53,16 @@ public class TagTest {
Assert.assertEquals(projectVersionTag.getKey(), PROJECT_VERSION_KEY);
Assert.assertEquals(projectVersionTag.getValue().intValue(),
PROJECT_VERSION);
}
+
+ @Test
+ public void testSerde() throws IOException {
+ Tag<String> jobIdTag = new Tag<String>(JOB_ID_KEY, JOB_ID);
+ Tag<Integer> projectVersionTag = new Tag<Integer>(PROJECT_VERSION_KEY,
PROJECT_VERSION);
+ List<Tag<?>> tags = Arrays.asList(jobIdTag, projectVersionTag);
+ String bytes = objectMapper.writeValueAsString(tags);
+
+ JavaType type =
objectMapper.getTypeFactory().constructCollectionType(List.class, Tag.class);
+ List<Tag<?>> deserTags = objectMapper.readValue(bytes, type);
+ Assert.assertEquals(deserTags, tags);
+ }
}
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 8fe09e5a5..db279355f 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -135,10 +135,6 @@ public class AzkabanGobblinYarnAppLauncher extends
AbstractJob {
@Override
public void cancel() throws Exception {
- try {
- this.gobblinYarnAppLauncher.stop();
- } finally {
- super.cancel();
- }
+ this.gobblinYarnAppLauncher.stop();
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index fbd4294de..690dc113b 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -1027,7 +1027,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
* Build the {@link EventSubmitter} for this class.
*/
private EventSubmitter buildEventSubmitter(List<? extends Tag<?>> tags) {
- return new EventSubmitter.Builder(this.runtimeMetricContext,
"gobblin.runtime")
+ return new EventSubmitter.Builder(this.runtimeMetricContext,
JobMetrics.NAMESPACE)
.addMetadata(Tag.toMap(Tag.tagValuesToString(tags))).build();
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 7ebb77d11..d8ce6f12a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -502,7 +502,7 @@ public class GobblinMultiTaskAttempt {
}
EventSubmitter.Builder eventSubmitterBuilder = new
EventSubmitter.Builder(JobMetrics.get(this.jobId, new
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
- "gobblin.runtime");
+ JobMetrics.NAMESPACE);
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState,
JobEvent.TASKS_SUBMITTED));
eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED,
"tasksCount", Integer.toString(tasks.size()));
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
index 03146e1df..bb8bac9b8 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
@@ -42,6 +42,7 @@ import org.apache.gobblin.util.ClustersNames;
*/
@Slf4j
public class JobMetrics extends GobblinMetrics {
+ public static final String NAMESPACE = "gobblin.runtime";
public static final CreatorTag DEFAULT_CREATOR_TAG = new CreatorTag(
"driver");
protected final String jobName;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index f238ffc9b..182ce7daa 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -43,6 +43,18 @@ public interface GobblinTemporalConfigurationKeys {
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
+ /**
+ * Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing
collisions with prod jobs
+ * during testing
+ *
+ */
+ String GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX = PREFIX + "job.metrics.suffix";
+ /**
+ * Default suffix for metrics emitted by GobblinTemporalJobLauncher for
preventing collisions with prod jobs
+ * is not empty because temporal is still in alpha stages, and should not
accidentally affect a prod job
+ */
+ String DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX = "-temporal";
+
/**
* Number of worker processes to spin up per task runner
* NOTE: If this size is too large, your container can OOM and halt
execution unexpectedly. It's recommended not to touch
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 14a4bacf4..f7040f79f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -17,20 +17,24 @@
package org.apache.gobblin.temporal.ddm.launcher;
-import io.temporal.client.WorkflowOptions;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
import com.typesafe.config.ConfigFactory;
-import org.apache.hadoop.fs.Path;
+import io.temporal.client.WorkflowOptions;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
@@ -84,6 +88,12 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
}
Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec,
Help.loadFileSystem(wuSpec)));
+
+ wuSpec.setTags(GobblinMetrics.getCustomTagsFromState(new
State(jobProps)));
+ wuSpec.setMetricsSuffix(this.jobProps.getProperty(
+ GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
+
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX));
+
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps)))
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index 3b2597194..5e3ba3dd1 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,15 +18,23 @@
package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
+
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.hadoop.fs.Path;
+
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
@@ -44,6 +52,8 @@ public class WUProcessingSpec implements FileSystemApt,
FileSystemJobStateful {
@NonNull private URI fileSystemUri;
@NonNull private String workUnitsDir;
@NonNull private Tuning tuning = Tuning.DEFAULT;
+ @NonNull private List<Tag<?>> tags = new ArrayList<>();
+ @NonNull private String metricsSuffix =
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
@JsonIgnore // (because no-arg method resembles 'java bean property')
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index d425bbc4b..99c4d0536 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -29,6 +29,7 @@ import
org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
/** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
@@ -47,7 +48,7 @@ public class WorkFulfillmentWorker extends
AbstractTemporalWorker {
@Override
protected Object[] getActivityImplInstances() {
- return new Object[] { new ProcessWorkUnitImpl(), new
CommitActivityImpl() };
+ return new Object[] { new ProcessWorkUnitImpl(), new
CommitActivityImpl(), new SubmitGTEActivityImpl() };
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index ba2ccf99a..eccad9bd5 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.temporal.ddm.workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
+
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 141f22057..531a018c7 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,7 +16,18 @@
*/
package org.apache.gobblin.temporal.ddm.workflow.impl;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.jetbrains.annotations.NotNull;
import com.typesafe.config.ConfigFactory;
@@ -25,17 +36,26 @@ import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
-import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
@Slf4j
@@ -45,6 +65,32 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
@Override
public int process(WUProcessingSpec workSpec) {
+ try {
+ EventSubmitterContext eventSubmitterContext =
getEventSubmitterContext(workSpec);
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
+ try (EventTimer timer = timerFactory.createJobTimer()) {
+ return performWork(workSpec);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @NotNull
+ private EventSubmitterContext getEventSubmitterContext(WUProcessingSpec
workSpec)
+ throws IOException {
+ // NOTE: We are using the metrics tags from Job Props to create the metric
context for the timer and NOT
+ // the deserialized jobState from HDFS that is created by the real distcp
job. This is because the AZ runtime
+ // settings we want are for the job launcher that launched this Yarn job.
+ FileSystem fs = Help.loadFileSystemForce(workSpec);
+ JobState jobState = Help.loadJobStateUncached(workSpec, fs);
+ List<Tag<?>> tagsFromCurrentJob = workSpec.getTags();
+ String metricsSuffix = workSpec.getMetricsSuffix();
+ List<Tag<?>> tags = getTags(tagsFromCurrentJob, metricsSuffix, jobState);
+ return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+ }
+
+ private int performWork(WUProcessingSpec workSpec) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec);
int workunitsProcessed = processingWorkflow.performWorkload(
@@ -85,4 +131,34 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
}
+
+ private List<Tag<?>> getTags(List<Tag<?>> tagsFromCurJob, String
metricsSuffix, JobState jobStateFromHdfs) {
+ // Construct new tags list by combining subset of tags on HDFS job state
and the rest of the fields from the current job
+ Map<String, Tag<?>> tagsMap = new HashMap<>();
+ Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+ TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+ TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+ // Step 1, Add tags from the AZ props using the original job (the one that
launched this yarn app)
+ tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+ // Step 2. Add tags from the jobState (the original MR job on HDFS)
+ List<String> targetKeysToAddSuffix =
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+ .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+ .forEach(tag -> {
+ // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND
FLOW_GROUP_FIELDS to prevent collisions when testing
+ String value = targetKeysToAddSuffix.contains(tag.getKey())
+ ? tag.getValue() + metricsSuffix
+ : String.valueOf(tag.getValue());
+ tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+ });
+
+ // Step 3: Overwrite any pre-existing metadata with name of the current
caller
+ tagsMap.put(GobblinMetricsKeys.CLASS_META, new
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+ return new ArrayList<>(tagsMap.values());
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index c6478b95f..f7d7255ed 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -23,10 +23,11 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
-import lombok.extern.slf4j.Slf4j;
+import io.temporal.workflow.Workflow;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
@@ -55,8 +56,9 @@ import static
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClien
* </p>
*/
@Alpha
-@Slf4j
public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
+ private static final Logger log =
Workflow.getLogger(GobblinTemporalJobLauncher.class);
+
protected WorkflowServiceStubs workflowServiceStubs;
protected WorkflowClient client;
protected String queueName;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
index 0dcf19a77..6bef7a609 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -25,7 +25,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import io.temporal.api.enums.v1.ParentClosePolicy;
@@ -33,6 +32,7 @@ import io.temporal.workflow.Async;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
@@ -109,7 +109,7 @@ public abstract class
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
String thisWorkflowId = Workflow.getInfo().getWorkflowId();
String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" +
childAddr;
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
- .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+ .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
.setWorkflowId(childWorkflowId)
.build();
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
index 2bcf421b0..c653ce326 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
@@ -20,13 +20,19 @@ package org.apache.gobblin.temporal.workflows.helloworld;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
@WorkflowInterface
public interface GreetingWorkflow {
/**
* This is the method that is executed when the Workflow Execution is
started. The Workflow
* Execution completes when this method finishes execution.
+ *
+ * This method also shows an example of metrics emission using the {@link
EventSubmitter} seen in
+ * non-Temporal Gobblin code.
*/
@WorkflowMethod
- String getGreeting(String name);
+ String getGreeting(String name, EventSubmitterContext
eventSubmitterContext);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 774b77e15..9d2636036 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -19,11 +19,19 @@ package org.apache.gobblin.temporal.workflows.helloworld;
import java.time.Duration;
+import org.slf4j.Logger;
+
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
public class GreetingWorkflowImpl implements GreetingWorkflow {
+ private final Logger LOG = Workflow.getLogger(GreetingWorkflowImpl.class);
+
/*
* At least one of the following options needs to be defined:
* - setStartToCloseTimeout
@@ -41,16 +49,19 @@ public class GreetingWorkflowImpl implements
GreetingWorkflow {
*
* The activity options that were defined above are passed in as a
parameter.
*/
- private final FormatActivity activity =
Workflow.newActivityStub(FormatActivity.class, options);
+ private final FormatActivity formatActivity =
Workflow.newActivityStub(FormatActivity.class, options);
// This is the entry point to the Workflow.
@Override
- public String getGreeting(String name) {
-
+ public String getGreeting(String name, EventSubmitterContext
eventSubmitterContext) {
/**
- * If there were other Activity methods they would be orchestrated
here or from within other Activities.
- * This is a blocking call that returns only after the activity has
completed.
+ * Example of the {@link TemporalEventTimer.Factory} invoking child
activity for instrumentation.
*/
- return activity.composeGreeting(name);
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
+ try (TemporalEventTimer timer =
timerFactory.create("getGreetingTime")) {
+ LOG.info("Executing getGreeting");
+ timer.addMetadata("name", name);
+ return formatActivity.composeGreeting(name);
+ }
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
index 33183b63f..5d196fae4 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
/**
@@ -56,6 +57,9 @@ public class HelloWorldJobLauncher extends
GobblinTemporalJobLauncher {
public void submitJob(List<WorkUnit> workunits) {
WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(queueName).build();
GreetingWorkflow greetingWorkflow =
this.client.newWorkflowStub(GreetingWorkflow.class, options);
- greetingWorkflow.getGreeting("Gobblin");
+ EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext(this.eventSubmitter);
+
+ String greeting = greetingWorkflow.getGreeting("Gobblin",
eventSubmitterContext);
+ log.info(greeting);
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
index 274daee9c..d5d01cc33 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldWorker.java
@@ -22,6 +22,7 @@ import com.typesafe.config.Config;
import io.temporal.client.WorkflowClient;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
public class HelloWorldWorker extends AbstractTemporalWorker {
@@ -31,11 +32,16 @@ public class HelloWorldWorker extends
AbstractTemporalWorker {
@Override
protected Class<?>[] getWorkflowImplClasses() {
- return new Class[] { GreetingWorkflowImpl.class };
+ return new Class[] {
+ GreetingWorkflowImpl.class,
+ };
}
@Override
protected Object[] getActivityImplInstances() {
- return new Object[] { new FormatActivityImpl() };
+ return new Object[] {
+ new FormatActivityImpl(),
+ new SubmitGTEActivityImpl()
+ };
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
new file mode 100644
index 000000000..f0db7e7ab
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+import static org.apache.gobblin.instrumented.GobblinMetricsKeys.CLASS_META;
+
+
+/**
+ * Wrapper for sending the core essence of an {@link EventSubmitter} over the
wire (e.g. metadata tags, namespace)
+ * This is in lieu of sending the entire {@link EventSubmitter} object over
the wire, which is not serializable without
+ * losing some information, such as the gauges
+ */
+@Getter
+public class EventSubmitterContext {
+ private final List<Tag<?>> tags;
+ private final String namespace;
+ private final Class callerClass;
+
+ @JsonCreator
+ private EventSubmitterContext(
+ @JsonProperty("tags") List<Tag<?>> tags,
+ @JsonProperty("namespace") String namespace,
+ @JsonProperty("callerClass") Class callerClass) {
+ this.tags = tags;
+ this.namespace = namespace;
+ this.callerClass = callerClass;
+ }
+
+ public EventSubmitterContext(List<Tag<?>> tags, String namespace) {
+ // Explicitly send class over the wire to avoid any classloader issues
+ this(tags, namespace, tags.stream()
+ .filter(tag -> tag.getKey().equals(CLASS_META))
+ .findAny()
+ .map(tag -> (String) tag.getValue())
+ .map(EventSubmitterContext::resolveClass)
+ .orElse(EventSubmitterContext.class));
+ }
+
+ public EventSubmitterContext(EventSubmitter eventSubmitter) {
+ this(eventSubmitter.getTags(), eventSubmitter.getNamespace());
+ }
+
+ public EventSubmitter create() {
+ MetricContext metricContext = Instrumented.getMetricContext(new State(),
callerClass, tags);
+ return new EventSubmitter.Builder(metricContext, namespace).build();
+ }
+
+ private static Class resolveClass(String canonicalClassName) {
+ try {
+ return Class.forName(canonicalClassName);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
new file mode 100644
index 000000000..677baf432
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.io.Closeable;
+
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * <p> A timer that can be used to track the duration of an event. This event
differs from the {@link TimingEvent} in that
+ * this class is not meant to be used outside of {@link
io.temporal.workflow.Workflow} code. We cannot use {@link TimingEvent}
+ * because it is not serializable and cannot be passed to {@link
io.temporal.workflow.Workflow} code due to the
+ * {@link EventSubmitter} field. It also relies on {@link
System#currentTimeMillis()} which not compatible with {@link
io.temporal.workflow.Workflow}
+ * since {@link System#currentTimeMillis()} is not deterministic.</p>
+ *
+ * <p> {@link EventSubmitter} is not easily serializable because the {@link
MetricContext} field
+ * contains bi-directional relationships via the {@link
org.apache.gobblin.metrics.InnerGauge}. Although it's possible
+ * to write a custom serializer for {@link EventSubmitter}, it creates a
non-obvious sleight of hand where the EventSubmitter
+ * metadata will change when crossing {@link io.temporal.workflow.Workflow} or
{@link io.temporal.activity.Activity} boundaries. </p>
+ *
+ * <p> It differs from {@link Closeable} because the close method does not
throw {@link java.io.IOException}. {@link TimingEvent}
+ * does this but the issue is it does not implement an interface. Inheritance
is not a good solution either because of the
+ * {@link EventSubmitter} member variable. </p>
+ */
+public interface EventTimer extends Closeable {
+ /**
+ * Add additional metadata that will be used for post-processing when the
timer is stopped via {@link #stop()}
+ * @param key
+ * @param metadata
+ */
+ void addMetadata(String key, String metadata);
+
+ /**
+ * Stops the timer and execute any post-processing (e.g. event submission)
+ */
+ void stop();
+
+ default void close() {
+ stop();
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
similarity index 64%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
index 2bcf421b0..aa975d10d 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivity.java
@@ -15,18 +15,16 @@
* limitations under the License.
*/
-package org.apache.gobblin.temporal.workflows.helloworld;
+package org.apache.gobblin.temporal.workflows.metrics;
-import io.temporal.workflow.WorkflowInterface;
-import io.temporal.workflow.WorkflowMethod;
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
-@WorkflowInterface
-public interface GreetingWorkflow {
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
- /**
- * This is the method that is executed when the Workflow Execution is
started. The Workflow
- * Execution completes when this method finishes execution.
- */
- @WorkflowMethod
- String getGreeting(String name);
+
+@ActivityInterface
+public interface SubmitGTEActivity {
+ @ActivityMethod
+ void submitGTE(GobblinEventBuilder eventBuilder, EventSubmitterContext
eventSubmitterContext);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
similarity index 60%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
index 2bcf421b0..63bce421a 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
@@ -15,18 +15,20 @@
* limitations under the License.
*/
-package org.apache.gobblin.temporal.workflows.helloworld;
+package org.apache.gobblin.temporal.workflows.metrics;
-import io.temporal.workflow.WorkflowInterface;
-import io.temporal.workflow.WorkflowMethod;
+import org.slf4j.Logger;
-@WorkflowInterface
-public interface GreetingWorkflow {
+import io.temporal.workflow.Workflow;
- /**
- * This is the method that is executed when the Workflow Execution is
started. The Workflow
- * Execution completes when this method finishes execution.
- */
- @WorkflowMethod
- String getGreeting(String name);
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+
+public class SubmitGTEActivityImpl implements SubmitGTEActivity {
+ private static Logger log =
Workflow.getLogger(SubmitGTEActivityImpl.class);
+
+ @Override
+ public void submitGTE(GobblinEventBuilder eventBuilder,
EventSubmitterContext eventSubmitterContext) {
+ eventSubmitterContext.create().submit(eventBuilder);
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
new file mode 100644
index 000000000..0f68f1132
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.workflow.Workflow;
+import lombok.RequiredArgsConstructor;
+
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * Boiler plate for tracking elapsed time of events that is compatible with
{@link Workflow}
+ * by using activities to record time
+ *
+ * This class is very similar to {@link TimingEvent} but uses {@link Workflow}
compatible APIs. It's possible to refactor
+ * this class to inherit the {@link TimingEvent} but extra care would be
needed to remove the {@link EventSubmitter} field
+ * since that class is not serializable without losing some information
+ */
+@RequiredArgsConstructor
+public class TemporalEventTimer implements EventTimer {
+ private final SubmitGTEActivity trackingEventActivity;
+ private final GobblinEventBuilder eventBuilder;
+ private final EventSubmitterContext eventSubmitterContext;
+ private final Instant startTime;
+
+ @Override
+ public void stop() {
+ stop(getCurrentTime());
+ }
+
+ @Override
+ public void addMetadata(String key, String metadata) {
+ this.eventBuilder.addMetadata(key, metadata);
+ }
+
+
+ private void stop(Instant endTime) {
+ this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE,
TimingEvent.METADATA_TIMING_EVENT);
+ this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME,
Long.toString(this.startTime.toEpochMilli()));
+ this.eventBuilder.addMetadata(TimingEvent.METADATA_END_TIME,
Long.toString(endTime.toEpochMilli()));
+ Duration duration = Duration.between(this.startTime, endTime);
+ this.eventBuilder.addMetadata(TimingEvent.METADATA_DURATION,
Long.toString(duration.toMillis()));
+
+ trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext);
+ }
+
+ private static Instant getCurrentTime() {
+ return Instant.ofEpochMilli(Workflow.currentTimeMillis());
+ }
+
+ public static class Factory {
+ private static final ActivityOptions DEFAULT_OPTS =
ActivityOptions.newBuilder().build();
+ private final SubmitGTEActivity submitGTEActivity;
+ private final EventSubmitterContext eventSubmitterContext;
+
+ public Factory(EventSubmitterContext eventSubmitterContext) {
+ this(eventSubmitterContext, DEFAULT_OPTS);
+ }
+
+ public Factory(EventSubmitterContext eventSubmitterContext,
ActivityOptions opts) {
+ this.submitGTEActivity =
Workflow.newActivityStub(SubmitGTEActivity.class, opts);
+ this.eventSubmitterContext = eventSubmitterContext;
+ }
+
+ public TemporalEventTimer create(String eventName, Instant startTime) {
+ GobblinEventBuilder eventBuilder = new GobblinEventBuilder(eventName,
eventSubmitterContext.getNamespace());
+ return new TemporalEventTimer(submitGTEActivity, eventBuilder,
this.eventSubmitterContext, startTime);
+ }
+
+ public TemporalEventTimer create(String eventName) {
+ return create(eventName, getCurrentTime());
+ }
+
+ /**
+ * Utility for creating a timer that emits separate events at the start
and end of a job. This imitates the behavior in
+ * {@link org.apache.gobblin.runtime.AbstractJobLauncher} and emits events
that are compatible with the
+ * {@link org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor} to
update GaaS flow statuses
+ *
+ * @return a timer that emits an event at the beginning of the job and a
completion event ends at the end of the job
+ */
+ public TemporalEventTimer createJobTimer() {
+ TemporalEventTimer startTimer =
create(TimingEvent.LauncherTimings.JOB_START);
+ startTimer.stop(Instant.EPOCH); // Emit start job event containing a
stub end time
+ return create(TimingEvent.LauncherTimings.JOB_COMPLETE,
startTimer.startTime);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 829b97e1f..ab6954508 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -17,21 +17,6 @@
package org.apache.gobblin.temporal.yarn;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -50,32 +35,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import lombok.AccessLevel;
-import lombok.Getter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
-import org.apache.gobblin.cluster.GobblinClusterUtils;
-import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
-import org.apache.gobblin.metrics.MultiReporterException;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
-import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
-import org.apache.gobblin.yarn.GobblinYarnEventConstants;
-import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
-import org.apache.gobblin.yarn.YarnHelixUtils;
-import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
-import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
-import org.apache.gobblin.yarn.event.NewContainerRequest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -107,6 +66,49 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.yarn.GobblinYarnEventConstants;
+import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
+import org.apache.gobblin.yarn.YarnHelixUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
/**
* This class is responsible for all Yarn-related stuffs including
ApplicationMaster registration,
@@ -683,18 +685,10 @@ class YarnService extends AbstractIdleService {
LOGGER.info("Adding instance {} to the pool of unused instances",
completedInstanceName);
this.unusedHelixInstanceNames.add(completedInstanceName);
- if (this.eventSubmitter.isPresent()) {
- this.eventSubmitter.get()
-
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
eventMetadataBuilder.get().build());
- }
+ // NOTE: logic for handling container failure is removed because
original implementation relies on the auto scaling manager
+ // to control the number of containers by polling helix for the current
number of tasks
+ // Without that integration, that code requests too many containers when
there are exceptions and overloads yarn
}
- Optional<Resource> newContainerResource = completedContainerInfo != null ?
- Optional.of(completedContainerInfo.getContainer().getResource()) :
Optional.absent();
- LOGGER.info("Requesting a new container to replace {} to run Helix
instance {} with helix tag {} and resource {}",
- containerStatus.getContainerId(), completedInstanceName, helixTag,
newContainerResource.orNull());
- this.eventBus.post(new NewContainerRequest(
- shouldStickToTheSameNode(containerStatus.getExitStatus()) &&
completedContainerInfo != null ?
- Optional.of(completedContainerInfo.getContainer()) :
Optional.absent(), newContainerResource));
}
private boolean handleAbortedContainer(ContainerStatus containerStatus,
ContainerInfo completedContainerInfo,
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
new file mode 100644
index 000000000..91c960eda
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContextTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+import static org.apache.gobblin.instrumented.GobblinMetricsKeys.CLASS_META;
+
+public class EventSubmitterContextTest {
+ private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final String NAMESPACE = "test-namespace";
+ @Test
+ public void testCreateEventSubmitter()
+ throws IOException {
+ List<Tag<?>> tags = Arrays.asList(new Tag<>("jobId", "stub"));
+ State state = new State();
+ state.setProp("someState", "stub");
+ MetricContext metricContext = Instrumented.getMetricContext(state,
getClass(), tags);
+ EventSubmitter eventSubmitter = new EventSubmitter.Builder(metricContext,
NAMESPACE).build();
+ EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext(eventSubmitter);
+ byte[] asBytes = OBJECT_MAPPER.writeValueAsBytes(eventSubmitterContext);
+
+ EventSubmitterContext deserEventMetadata =
OBJECT_MAPPER.readValue(asBytes, EventSubmitterContext.class);
+ EventSubmitter deserEventSubmitter = deserEventMetadata.create();
+ Assert.assertTrue(deserEventSubmitter.getTags().contains(tags.get(0)));
+ Assert.assertEquals(deserEventSubmitter.getNamespace(), NAMESPACE);
+
+ Map<? extends String, String> tagMap =
Tag.toMap(Tag.tagValuesToString(eventSubmitter.getTags()));
+ Assert.assertEquals(tagMap.get(CLASS_META), this.getClass().getName());
+
Assert.assertTrue(tagMap.containsKey(MetricContext.METRIC_CONTEXT_ID_TAG_NAME));
+
Assert.assertTrue(tagMap.containsKey(MetricContext.METRIC_CONTEXT_NAME_TAG_NAME));
+ }
+}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 391b160fd..d76405aeb 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -104,11 +104,14 @@ import org.apache.gobblin.cluster.GobblinHelixConstants;
import org.apache.gobblin.cluster.GobblinHelixMessagingService;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
import org.apache.gobblin.rest.JobExecutionInfoServer;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.util.AzkabanTags;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.EmailUtils;
@@ -121,6 +124,7 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
+import static
org.apache.gobblin.metrics.GobblinMetrics.METRICS_STATE_CUSTOM_TAGS;
import static
org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
@@ -1051,8 +1055,15 @@ public class GobblinYarnAppLauncher {
Schema schema = KafkaReporterUtils.getMetricReportSchema();
String schemaId = registry.register(schema,
KafkaReporterUtils.getMetricsTopic(properties).get());
LOGGER.info("Adding schemaId {} for MetricReport to the config",
schemaId);
- config =
config.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
- ConfigValueFactory.fromAnyRef(schemaId));
+ List<Tag<?>> tags = Lists.newArrayList();
+ tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
+ GobblinMetrics.addCustomTagsToProperties(properties, tags);
+
+ config = config
+
.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
+ ConfigValueFactory.fromAnyRef(schemaId))
+ .withValue(METRICS_STATE_CUSTOM_TAGS,
+
ConfigValueFactory.fromAnyRef(properties.getProperty(METRICS_STATE_CUSTOM_TAGS)));
}
return config;
}