This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3712ec5  [GOBBLIN-1047] Add Helix and Yarn container metadata to all 
task events emitted by Gobblin Helix tasks[]
3712ec5 is described below

commit 3712ec5958d803b8f1a726ab91154f92b0a158a8
Author: sv2000 <[email protected]>
AuthorDate: Tue Feb 11 15:56:40 2020 -0800

    [GOBBLIN-1047] Add Helix and Yarn container metadata to all task events 
emitted by Gobblin Helix tasks[]
    
    Closes #2887 from sv2000/containerInfo
---
 .../gobblin/configuration/ConfigurationKeys.java   |  6 +++
 .../runtime/NoopTaskEventMetadataGenerator.java    | 41 +++++++++++++++++
 .../runtime/api/TaskEventMetadataGenerator.java    | 34 ++++++++++++++
 .../gobblin/util/TaskEventMetadataUtils.java       | 42 +++++++++++++++++
 .../cluster/HelixTaskEventMetadataGenerator.java   | 53 ++++++++++++++++++++++
 .../HelixTaskEventMetadataGeneratorTest.java       | 52 +++++++++++++++++++++
 .../extract/kafka/KafkaExtractorStatsTracker.java  |  9 +++-
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 11 +++--
 .../main/java/org/apache/gobblin/runtime/Task.java | 10 +++-
 .../org/apache/gobblin/runtime/task/TaskUtils.java |  2 +-
 10 files changed, 253 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index df86caa..e8c04c2 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -988,4 +988,10 @@ public class ConfigurationKeys {
   public static final String AVRO_SCHEMA_CHECK_STRATEGY = 
"avro.schema.check.strategy";
   public static final String AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT =
       "org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy";
+
+  /**
+   * Configuration for emitting task events
+   */
+  public static final String TASK_EVENT_METADATA_GENERATOR_CLASS_KEY = 
"gobblin.task.event.metadata.generator.class";
+  public static final String DEFAULT_TASK_EVENT_METADATA_GENERATOR_CLASS_KEY = 
"nooptask";
 }
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/NoopTaskEventMetadataGenerator.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/NoopTaskEventMetadataGenerator.java
new file mode 100644
index 0000000..d4958d6
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/NoopTaskEventMetadataGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+
+@Alias("nooptask")
+public class NoopTaskEventMetadataGenerator implements 
TaskEventMetadataGenerator  {
+  /**
+   * Generate a map of additional metadata for the specified event name.
+   *
+   * @param taskState
+   * @param eventName the event name used to determine which additional 
metadata should be emitted
+   * @return {@link Map} with the additional metadata
+   */
+  @Override
+  public Map<String, String> getMetadata(State taskState, String eventName) {
+    return ImmutableMap.of();
+  }
+}
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/TaskEventMetadataGenerator.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/TaskEventMetadataGenerator.java
new file mode 100644
index 0000000..21686ee
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/TaskEventMetadataGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.api;
+
+import java.util.Map;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * For generating additional event metadata to associate with Task Events.
+ */
+public interface TaskEventMetadataGenerator {
+  /**
+   * Generate a map of additional metadata for the specified event name.
+   * @param eventName the event name used to determine which additional 
metadata should be emitted
+   * @return {@link Map} with the additional metadata
+   */
+  Map<String, String> getMetadata(State taskState, String eventName);
+}
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/util/TaskEventMetadataUtils.java 
b/gobblin-api/src/main/java/org/apache/gobblin/util/TaskEventMetadataUtils.java
new file mode 100644
index 0000000..5335cad
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/util/TaskEventMetadataUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+
+public class TaskEventMetadataUtils {
+  /**
+   * Construct a {@link TaskEventMetadataGenerator} from the task state.
+   * @param taskState
+   * @return
+   */
+  public static TaskEventMetadataGenerator getTaskEventMetadataGenerator(State 
taskState) {
+    String taskEventMetadataGeneratorClassName =
+        
taskState.getProp(ConfigurationKeys.TASK_EVENT_METADATA_GENERATOR_CLASS_KEY,
+            ConfigurationKeys.DEFAULT_TASK_EVENT_METADATA_GENERATOR_CLASS_KEY);
+    ClassAliasResolver<TaskEventMetadataGenerator> aliasResolver =
+        new ClassAliasResolver<>(TaskEventMetadataGenerator.class);
+    try {
+      return 
aliasResolver.resolveClass(taskEventMetadataGeneratorClassName).newInstance();
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Could not construct 
TaskEventMetadataGenerator " +
+          taskEventMetadataGeneratorClassName, e);
+    }
+  }
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGenerator.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGenerator.java
new file mode 100644
index 0000000..057dfcb
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGenerator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+
+@Alias("helixtask")
+public class HelixTaskEventMetadataGenerator implements 
TaskEventMetadataGenerator  {
+  public static final String HELIX_INSTANCE_KEY = "helixInstance";
+  public static final String HOST_NAME_KEY = "containerNode";
+  public static final String HELIX_JOB_ID_KEY = "helixJobId";
+  public static final String HELIX_TASK_ID_KEY = "helixTaskId";
+  public static final String CONTAINER_ID_KEY = "containerId";
+  /**
+   * Generate a map of additional metadata for the specified event name. For 
tasks running in Gobblin cluster
+   * we add container info such as containerId, host name where the task is 
running to each event.
+   *
+   * @param taskState
+   * @param eventName the event name used to determine which additional 
metadata should be emitted
+   * @return {@link Map} with the additional metadata
+   */
+  @Override
+  public Map<String, String> getMetadata(State taskState, String eventName) {
+    String helixInstanceName = 
taskState.getProp(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, "");
+    String helixJobId = 
taskState.getProp(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, "");
+    String helixTaskId = 
taskState.getProp(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, "");
+    String hostName = 
taskState.getProp(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, 
"");
+    String containerId = 
taskState.getProp(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, "");
+
+    return ImmutableMap.of(HELIX_INSTANCE_KEY, helixInstanceName, 
HOST_NAME_KEY, hostName, HELIX_JOB_ID_KEY, helixJobId,
+        HELIX_TASK_ID_KEY, helixTaskId, CONTAINER_ID_KEY, containerId);
+  }
+}
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGeneratorTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGeneratorTest.java
new file mode 100644
index 0000000..c190db7
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixTaskEventMetadataGeneratorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+
+@Test
+public class HelixTaskEventMetadataGeneratorTest {
+
+  public void testGetMetadata() {
+    State state = new State();
+    state.setProp(ConfigurationKeys.TASK_EVENT_METADATA_GENERATOR_CLASS_KEY, 
"helixtask");
+    state.setProp(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, 
"container-1");
+    state.setProp(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, "task-1");
+    state.setProp(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, "job-1");
+
+    TaskEventMetadataGenerator metadataGenerator = 
TaskEventMetadataUtils.getTaskEventMetadataGenerator(state);
+    //Ensure instantiation is done correctly
+    Assert.assertTrue(metadataGenerator != null);
+
+    //Ensure metadata map is correctly populated
+    Map<String, String> metadataMap = metadataGenerator.getMetadata(state, 
"testEventName");
+    Assert.assertEquals(metadataMap.size(), 5);
+    
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.HELIX_INSTANCE_KEY),
 "");
+    
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.CONTAINER_ID_KEY),
 "container-1");
+    
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.HOST_NAME_KEY),
 "");
+    
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.HELIX_TASK_ID_KEY),
 "task-1");
+    
Assert.assertEquals(metadataMap.get(HelixTaskEventMetadataGenerator.HELIX_JOB_ID_KEY),
 "job-1");
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 60ebb85..712d7b0 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -31,6 +31,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
 
 
 /**
@@ -66,6 +68,7 @@ public class KafkaExtractorStatsTracker {
   private final Map<KafkaPartition, ExtractorStats> statsMap;
   private final Set<Integer> errorPartitions;
   private final WorkUnitState workUnitState;
+  private final TaskEventMetadataGenerator taskEventMetadataGenerator;
   private boolean isSlaConfigured;
   private long recordLevelSlaMillis;
 
@@ -85,6 +88,7 @@ public class KafkaExtractorStatsTracker {
       this.isSlaConfigured = true;
       this.recordLevelSlaMillis = 
TimeUnit.MINUTES.toMillis(this.workUnitState.getPropAsLong(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY));
     }
+    this.taskEventMetadataGenerator = 
TaskEventMetadataUtils.getTaskEventMetadataGenerator(workUnitState);
   }
 
   public int getErrorPartitionCount() {
@@ -323,8 +327,9 @@ public class KafkaExtractorStatsTracker {
           .put(this.partitions.get(i), createTagsForPartition(i, lowWatermark, 
highWatermark, nextWatermark));
     }
     for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : 
tagsForPartitionsMap.entrySet()) {
-      new EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE).build()
-          .submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME, 
eventTags.getValue());
+      EventSubmitter.Builder eventSubmitterBuilder = new 
EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE);
+      
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
 KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME));
+      
eventSubmitterBuilder.build().submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME, 
eventTags.getValue());
     }
   }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index ebd3be9..016998d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.JobEvent;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
 import org.apache.gobblin.runtime.task.TaskFactory;
 import org.apache.gobblin.runtime.task.TaskIFaceWrapper;
 import org.apache.gobblin.runtime.task.TaskUtils;
@@ -61,6 +62,7 @@ import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
 
@@ -97,6 +99,7 @@ public class GobblinMultiTaskAttempt {
   private final Optional<String> containerIdOptional;
   private final Optional<StateStore<TaskState>> taskStateStoreOptional;
   private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
+  private final TaskEventMetadataGenerator taskEventMetadataGenerator;
   @Setter
   private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = (gmta) -> 
false;
   private List<Task> tasks;
@@ -123,6 +126,7 @@ public class GobblinMultiTaskAttempt {
         LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() + "-" 
+ containerIdOptional.or("noattempt"));
     this.jobBroker = jobBroker;
     this.tasks = new ArrayList<>();
+    this.taskEventMetadataGenerator = 
TaskEventMetadataUtils.getTaskEventMetadataGenerator(jobState);
   }
 
   /**
@@ -413,9 +417,10 @@ public class GobblinMultiTaskAttempt {
       }
     }
 
-    new EventSubmitter.Builder(JobMetrics.get(this.jobId, new 
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
-        "gobblin.runtime").build()
-        .submit(JobEvent.TASKS_SUBMITTED, "tasksCount", 
Long.toString(countDownLatch.getRegisteredParties()));
+    EventSubmitter.Builder eventSubmitterBuilder = new 
EventSubmitter.Builder(JobMetrics.get(this.jobId, new 
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
+        "gobblin.runtime");
+    
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState,
 JobEvent.TASKS_SUBMITTED));
+    eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED, 
"tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
 
     return tasks;
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index ccb38f5..ecb7c66 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -75,6 +75,7 @@ import org.apache.gobblin.publisher.SingleTaskDataPublisher;
 import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
 import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
 import org.apache.gobblin.records.RecordStreamProcessor;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
 import org.apache.gobblin.runtime.fork.AsynchronousFork;
 import org.apache.gobblin.runtime.fork.Fork;
 import org.apache.gobblin.runtime.fork.SynchronousFork;
@@ -86,6 +87,7 @@ import org.apache.gobblin.source.extractor.StreamingExtractor;
 import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
 import org.apache.gobblin.writer.AcknowledgableWatermark;
 import org.apache.gobblin.writer.DataWriter;
 import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
@@ -156,6 +158,7 @@ public class Task implements TaskIFace {
   private final List<RecordStreamProcessor<?,?,?,?>> recordStreamProcessors;
 
   private final Closer closer;
+  private final TaskEventMetadataGenerator taskEventMetadataGenerator;
 
   private long startTime;
   private volatile long lastRecordPulledTimestampMillis;
@@ -267,6 +270,7 @@ public class Task implements TaskIFace {
       this.watermarkTracker = Optional.absent();
       this.watermarkStorage = Optional.absent();
     }
+    this.taskEventMetadataGenerator = 
TaskEventMetadataUtils.getTaskEventMetadataGenerator(taskState);
   }
 
   /**
@@ -568,6 +572,7 @@ public class Task implements TaskIFace {
     FailureEventBuilder failureEvent = new 
FailureEventBuilder(FAILED_TASK_EVENT);
     failureEvent.setRootCause(t);
     failureEvent.addMetadata(TASK_STATE, this.taskState.toString());
+    
failureEvent.addAdditionalMetadata(this.taskEventMetadataGenerator.getMetadata(this.taskState,
 failureEvent.getName()));
     failureEvent.submit(taskContext.getTaskMetrics().getMetricContext());
   }
 
@@ -985,9 +990,12 @@ public class Task implements TaskIFace {
   protected void submitTaskCommittedEvent() {
     MetricContext taskMetricContext = 
TaskMetrics.get(this.taskState).getMetricContext();
     EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(taskMetricContext, "gobblin.runtime.task").build();
-    eventSubmitter.submit(TaskEvent.TASK_COMMITTED_EVENT_NAME, ImmutableMap
+    Map<String, String> metadataMap = Maps.newHashMap();
+    
metadataMap.putAll(this.taskEventMetadataGenerator.getMetadata(this.taskState, 
TaskEvent.TASK_COMMITTED_EVENT_NAME));
+    metadataMap.putAll(ImmutableMap
         .of(TaskEvent.METADATA_TASK_ID, this.taskId, 
TaskEvent.METADATA_TASK_ATTEMPT_ID,
             this.taskState.getTaskAttemptId().or("")));
+    eventSubmitter.submit(TaskEvent.TASK_COMMITTED_EVENT_NAME, metadataMap);
   }
 
   /**
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
index 6dfc5df..31623f9 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/TaskUtils.java
@@ -18,7 +18,7 @@
 package org.apache.gobblin.runtime.task;
 
 import com.google.common.base.Optional;
-import java.util.Properties;
+
 import org.apache.gobblin.configuration.State;
 
 

Reply via email to