This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3712ec5 [GOBBLIN-1047] Add Helix and Yarn container metadata to all
task events emitted by Gobblin Helix tasks[]
3712ec5 is described below
commit 3712ec5958d803b8f1a726ab91154f92b0a158a8
Author: sv2000 <[email protected]>
AuthorDate: Tue Feb 11 15:56:40 2020 -0800
[GOBBLIN-1047] Add Helix and Yarn container metadata to all task events
emitted by Gobblin Helix tasks[]
Closes #2887 from sv2000/containerInfo
---
.../gobblin/configuration/ConfigurationKeys.java | 6 +++
.../runtime/NoopTaskEventMetadataGenerator.java | 41 +++++++++++++++++
.../runtime/api/TaskEventMetadataGenerator.java | 34 ++++++++++++++
.../gobblin/util/TaskEventMetadataUtils.java | 42 +++++++++++++++++
.../cluster/HelixTaskEventMetadataGenerator.java | 53 ++++++++++++++++++++++
.../HelixTaskEventMetadataGeneratorTest.java | 52 +++++++++++++++++++++
.../extract/kafka/KafkaExtractorStatsTracker.java | 9 +++-
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 11 +++--
.../main/java/org/apache/gobblin/runtime/Task.java | 10 +++-
.../org/apache/gobblin/runtime/task/TaskUtils.java | 2 +-
10 files changed, 253 insertions(+), 7 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index df86caa..e8c04c2 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -988,4 +988,10 @@ public class ConfigurationKeys {
public static final String AVRO_SCHEMA_CHECK_STRATEGY =
"avro.schema.check.strategy";
public static final String AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT =
"org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy";
+
+ /**
+ * Configuration for emitting task events
+ */
+ public static final String TASK_EVENT_METADATA_GENERATOR_CLASS_KEY =
"gobblin.task.event.metadata.generator.class";
+ public static final String DEFAULT_TASK_EVENT_METADATA_GENERATOR_CLASS_KEY =
"nooptask";
}
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/NoopTaskEventMetadataGenerator.java
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/NoopTaskEventMetadataGenerator.java
new file mode 100644
index 0000000..d4958d6
--- /dev/null
+++
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/NoopTaskEventMetadataGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+
+@Alias("nooptask")
+public class NoopTaskEventMetadataGenerator implements
TaskEventMetadataGenerator {
+ /**
+ * Generate a map of additional metadata for the specified event name.
+ *
+ * @param taskState
+ * @param eventName the event name used to determine which additional
metadata should be emitted
+ * @return {@link Map} with the additional metadata
+ */
+ @Override
+ public Map<String, String> getMetadata(State taskState, String eventName) {
+ return ImmutableMap.of();
+ }
+}
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/TaskEventMetadataGenerator.java
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/TaskEventMetadataGenerator.java
new file mode 100644
index 0000000..21686ee
--- /dev/null
+++
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/TaskEventMetadataGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.api;
+
+import java.util.Map;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * For generating additional event metadata to associate with Task Events.
+ */
+public interface TaskEventMetadataGenerator {
+ /**
+ * Generate a map of additional metadata for the specified event name.
+ * @param eventName the event name used to determine which additional
metadata should be emitted
+ * @return {@link Map} with the additional metadata
+ */
+ Map<String, String> getMetadata(State taskState, String eventName);
+}
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/util/TaskEventMetadataUtils.java
b/gobblin-api/src/main/java/org/apache/gobblin/util/TaskEventMetadataUtils.java
new file mode 100644
index 0000000..5335cad
--- /dev/null
+++
b/gobblin-api/src/main/java/org/apache/gobblin/util/TaskEventMetadataUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+
+public class TaskEventMetadataUtils {
+ /**
+ * Construct a {@link TaskEventMetadataGenerator} from the task state.
+ * @param taskState
+ * @return
+ */
+ public static TaskEventMetadataGenerator getTaskEventMetadataGenerator(State
taskState) {
+ String taskEventMetadataGeneratorClassName =
+
taskState.getProp(ConfigurationKeys.TASK_EVENT_METADATA_GENERATOR_CLASS_KEY,
+ ConfigurationKeys.DEFAULT_TASK_EVENT_METADATA_GENERATOR_CLASS_KEY);
+ ClassAliasResolver<TaskEventMetadataGenerator> aliasResolver =
+ new ClassAliasResolver<>(TaskEventMetadataGenerator.class);
+ try {
+ return
aliasResolver.resolveClass(taskEventMetadataGeneratorClassName).newInstance();
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Could not construct
TaskEventMetadataGenerator " +
+ taskEventMetadataGeneratorClassName, e);
+ }
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGenerator.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGenerator.java
new file mode 100644
index 0000000..057dfcb
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGenerator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+
+@Alias("helixtask")
+public class HelixTaskEventMetadataGenerator implements
TaskEventMetadataGenerator {
+ public static final String HELIX_INSTANCE_KEY = "helixInstance";
+ public static final String HOST_NAME_KEY = "containerNode";
+ public static final String HELIX_JOB_ID_KEY = "helixJobId";
+ public static final String HELIX_TASK_ID_KEY = "helixTaskId";
+ public static final String CONTAINER_ID_KEY = "containerId";
+ /**
+ * Generate a map of additional metadata for the specified event name. For
tasks running in Gobblin cluster
+ * we add container info such as containerId, host name where the task is
running to each event.
+ *
+ * @param taskState
+ * @param eventName the event name used to determine which additional
metadata should be emitted
+ * @return {@link Map} with the additional metadata
+ */
+ @Override
+ public Map<String, String> getMetadata(State taskState, String eventName) {
+ String helixInstanceName =
taskState.getProp(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, "");
+ String helixJobId =
taskState.getProp(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, "");
+ String helixTaskId =
taskState.getProp(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, "");
+ String hostName =
taskState.getProp(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY,
"");
+ String containerId =
taskState.getProp(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, "");
+
+ return ImmutableMap.of(HELIX_INSTANCE_KEY, helixInstanceName,
HOST_NAME_KEY, hostName, HELIX_JOB_ID_KEY, helixJobId,
+ HELIX_TASK_ID_KEY, helixTaskId, CONTAINER_ID_KEY, containerId);
+ }
+}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGeneratorTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGeneratorTest.java
new file mode 100644
index 0000000..c190db7
--- /dev/null
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGeneratorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+
+@Test
+public class HelixTaskEventMetadataGeneratorTest {
+
+ public void testGetMetadata() {
+ State state = new State();
+ state.setProp(ConfigurationKeys.TASK_EVENT_METADATA_GENERATOR_CLASS_KEY,
"helixtask");
+ state.setProp(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY,
"container-1");
+ state.setProp(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, "task-1");
+ state.setProp(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, "job-1");
+
+ TaskEventMetadataGenerator metadataGenerator =
TaskEventMetadataUtils.getTaskEventMetadataGenerator(state);
+ //Ensure instantiation is done correctly
+ Assert.assertTrue(metadataGenerator != null);
+
+ //Ensure metadata map is correctly populated
+ Map<String, String> metadataMap = metadataGenerator.getMetadata(state,
"testEventName");
+ Assert.assertEquals(metadataMap.size(), 5);
+
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.HELIX_INSTANCE_KEY),
"");
+
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.CONTAINER_ID_KEY),
"container-1");
+
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.HOST_NAME_KEY),
"");
+
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.HELIX_TASK_ID_KEY),
"task-1");
+
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.HELIX_JOB_ID_KEY),
"job-1");
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 60ebb85..712d7b0 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -31,6 +31,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
/**
@@ -66,6 +68,7 @@ public class KafkaExtractorStatsTracker {
private final Map<KafkaPartition, ExtractorStats> statsMap;
private final Set<Integer> errorPartitions;
private final WorkUnitState workUnitState;
+ private final TaskEventMetadataGenerator taskEventMetadataGenerator;
private boolean isSlaConfigured;
private long recordLevelSlaMillis;
@@ -85,6 +88,7 @@ public class KafkaExtractorStatsTracker {
this.isSlaConfigured = true;
this.recordLevelSlaMillis =
TimeUnit.MINUTES.toMillis(this.workUnitState.getPropAsLong(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
}
+ this.taskEventMetadataGenerator =
TaskEventMetadataUtils.getTaskEventMetadataGenerator(workUnitState);
}
public int getErrorPartitionCount() {
@@ -323,8 +327,9 @@ public class KafkaExtractorStatsTracker {
.put(this.partitions.get(i), createTagsForPartition(i, lowWatermark,
highWatermark, nextWatermark));
}
for (Map.Entry<KafkaPartition, Map<String, String>> eventTags :
tagsForPartitionsMap.entrySet()) {
- new EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE).build()
- .submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME,
eventTags.getValue());
+ EventSubmitter.Builder eventSubmitterBuilder = new
EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE);
+
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME));
+
eventSubmitterBuilder.build().submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME,
eventTags.getValue());
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index ebd3be9..016998d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.JobEvent;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskIFaceWrapper;
import org.apache.gobblin.runtime.task.TaskUtils;
@@ -61,6 +62,7 @@ import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
@@ -97,6 +99,7 @@ public class GobblinMultiTaskAttempt {
private final Optional<String> containerIdOptional;
private final Optional<StateStore<TaskState>> taskStateStoreOptional;
private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
+ private final TaskEventMetadataGenerator taskEventMetadataGenerator;
@Setter
private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = (gmta) ->
false;
private List<Task> tasks;
@@ -123,6 +126,7 @@ public class GobblinMultiTaskAttempt {
LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() + "-"
+ containerIdOptional.or("noattempt"));
this.jobBroker = jobBroker;
this.tasks = new ArrayList<>();
+ this.taskEventMetadataGenerator =
TaskEventMetadataUtils.getTaskEventMetadataGenerator(jobState);
}
/**
@@ -413,9 +417,10 @@ public class GobblinMultiTaskAttempt {
}
}
- new EventSubmitter.Builder(JobMetrics.get(this.jobId, new
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
- "gobblin.runtime").build()
- .submit(JobEvent.TASKS_SUBMITTED, "tasksCount",
Long.toString(countDownLatch.getRegisteredParties()));
+ EventSubmitter.Builder eventSubmitterBuilder = new
EventSubmitter.Builder(JobMetrics.get(this.jobId, new
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
+ "gobblin.runtime");
+
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState,
JobEvent.TASKS_SUBMITTED));
+ eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED,
"tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
return tasks;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index ccb38f5..ecb7c66 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -75,6 +75,7 @@ import org.apache.gobblin.publisher.SingleTaskDataPublisher;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
import org.apache.gobblin.records.RecordStreamProcessor;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.runtime.fork.AsynchronousFork;
import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.runtime.fork.SynchronousFork;
@@ -86,6 +87,7 @@ import org.apache.gobblin.source.extractor.StreamingExtractor;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
import org.apache.gobblin.writer.AcknowledgableWatermark;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
@@ -156,6 +158,7 @@ public class Task implements TaskIFace {
private final List<RecordStreamProcessor<?,?,?,?>> recordStreamProcessors;
private final Closer closer;
+ private final TaskEventMetadataGenerator taskEventMetadataGenerator;
private long startTime;
private volatile long lastRecordPulledTimestampMillis;
@@ -267,6 +270,7 @@ public class Task implements TaskIFace {
this.watermarkTracker = Optional.absent();
this.watermarkStorage = Optional.absent();
}
+ this.taskEventMetadataGenerator =
TaskEventMetadataUtils.getTaskEventMetadataGenerator(taskState);
}
/**
@@ -568,6 +572,7 @@ public class Task implements TaskIFace {
FailureEventBuilder failureEvent = new
FailureEventBuilder(FAILED_TASK_EVENT);
failureEvent.setRootCause(t);
failureEvent.addMetadata(TASK_STATE, this.taskState.toString());
+
failureEvent.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(this.taskState,
failureEvent.getName()));
failureEvent.submit(taskContext.getTaskMetrics().getMetricContext());
}
@@ -985,9 +990,12 @@ public class Task implements TaskIFace {
protected void submitTaskCommittedEvent() {
MetricContext taskMetricContext =
TaskMetrics.get(this.taskState).getMetricContext();
EventSubmitter eventSubmitter = new
EventSubmitter.Builder(taskMetricContext, "gobblin.runtime.task").build();
- eventSubmitter.submit(TaskEvent.TASK_COMMITTED_EVENT_NAME, ImmutableMap
+ Map<String, String> metadataMap = Maps.newHashMap();
+
metadataMap.putAll(this.taskEventMetadataGenerator.getMetadata(this.taskState,
TaskEvent.TASK_COMMITTED_EVENT_NAME));
+ metadataMap.putAll(ImmutableMap
.of(TaskEvent.METADATA_TASK_ID, this.taskId,
TaskEvent.METADATA_TASK_ATTEMPT_ID,
this.taskState.getTaskAttemptId().or("")));
+ eventSubmitter.submit(TaskEvent.TASK_COMMITTED_EVENT_NAME, metadataMap);
}
/**
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
index 6dfc5df..31623f9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
@@ -18,7 +18,7 @@
package org.apache.gobblin.runtime.task;
import com.google.common.base.Optional;
-import java.util.Properties;
+
import org.apache.gobblin.configuration.State;