This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c9c8a8  [GOBBLIN-1621] Make HelixRetriggeringJobCallable emit job 
skip event when job is dropped due to previous job is running (#3478)
8c9c8a8 is described below

commit 8c9c8a84ed23c0215c4d80125ac532e97085d76f
Author: Zihan Li <[email protected]>
AuthorDate: Thu Mar 17 14:45:24 2022 -0700

    [GOBBLIN-1621] Make HelixRetriggeringJobCallable emit job skip event when 
job is dropped due to previous job is running (#3478)
    
    * [GOBBLIN-1621] Make HelixRetriggeringJobCallable emit job skip event when 
job is dropped due to previous job is running
    
    * address typo
    
    * address comments
    
    * fix checkStyle
    
    * address comments
---
 .../gobblin/cluster/GobblinHelixJobLauncher.java   | 57 +-------------
 .../gobblin/cluster/GobblinHelixJobScheduler.java  |  6 +-
 .../cluster/HelixRetriggeringJobCallable.java      | 17 ++++-
 .../org/apache/gobblin/cluster/HelixUtils.java     | 60 +++++++++++++++
 .../apache/gobblin/metrics/event/TimingEvent.java  |  1 +
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 88 ++++++++++++++++++++++
 .../main/java/org/apache/gobblin/runtime/Task.java |  2 +
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  1 +
 8 files changed, 173 insertions(+), 59 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 911ba2c..df15ee3 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -44,7 +44,6 @@ import com.github.rholder.retry.RetryException;
 import com.github.rholder.retry.Retryer;
 import com.github.rholder.retry.RetryerBuilder;
 import com.github.rholder.retry.StopStrategies;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
@@ -73,7 +72,6 @@ import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -140,7 +138,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> 
runningMap,
       Optional<GobblinHelixMetrics> helixMetrics) throws Exception {
 
-    super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags));
+    super(jobProps, HelixUtils.initBaseEventTags(jobProps, metadataTags));
     LOGGER.debug("GobblinHelixJobLauncher: jobProps {}, appWorkDir {}", 
jobProps, appWorkDir);
     this.helixManager = helixManager;
     this.helixTaskDriver = new TaskDriver(this.helixManager);
@@ -574,57 +572,4 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       this.fs.delete(jobStateFilePath, false);
     }
   }
-
-  /**
-   * Inject in some additional properties
-   * @param jobProps job properties
-   * @param inputTags list of metadata tags
-   * @return
-   */
-  private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties 
jobProps,
-      List<? extends Tag<?>> inputTags) {
-    List<Tag<?>> metadataTags = Lists.newArrayList(inputTags);
-    String jobId;
-
-    // generate job id if not already set
-    if (jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)) {
-      jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);
-    } else {
-      jobId = 
JobLauncherUtils.newJobId(JobState.getJobNameFromProps(jobProps));
-      jobProps.put(ConfigurationKeys.JOB_ID_KEY, jobId);
-    }
-
-    String jobExecutionId = Long.toString(Id.Job.parse(jobId).getSequence());
-
-    // only inject flow tags if a flow name is defined
-    if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
-      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
-          jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "")));
-      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
-          jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
-
-      // use job execution id if flow execution id is not present
-      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
-          jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobExecutionId)));
-    }
-
-    if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
-      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
-          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
-      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
-          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, 
"1")));
-      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
-          "false"));
-    }
-
-    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
-        jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
-    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
-        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "")));
-    metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
-
-    LOGGER.debug("GobblinHelixJobLauncher.addAdditionalMetadataTags: 
metadataTags {}", metadataTags);
-
-    return metadataTags;
-  }
 }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index e26f5dd..7e43c16 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -227,7 +227,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         this.jobHelixManager,
         this.taskDriverHelixManager,
         this.jobsMapping,
-        this.locks).call();
+        this.locks,
+        this.metricContext).call();
   }
 
   @Override
@@ -257,7 +258,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         this.jobHelixManager,
         this.taskDriverHelixManager,
         this.jobsMapping,
-        this.locks);
+        this.locks,
+        this.metricContext);
 
     final Future<?> future = this.jobExecutor.submit(retriggeringJob);
     return new Future() {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 735da83..37da2c0 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -17,14 +17,20 @@
 
 package org.apache.gobblin.cluster;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.HashMap;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.Lock;
 
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -104,6 +110,8 @@ class HelixRetriggeringJobCallable implements Callable {
   private final String jobUri;
   private boolean jobDeleteAttempted = false;
   private final Striped<Lock> locks;
+  private final MetricContext metricContext;
+  private final EventSubmitter eventSubmitter;
 
   public HelixRetriggeringJobCallable(
       GobblinHelixJobScheduler jobScheduler,
@@ -117,7 +125,8 @@ class HelixRetriggeringJobCallable implements Callable {
       HelixManager jobHelixManager,
       Optional<HelixManager> taskDriverHelixManager,
       HelixJobsMapping jobsMapping,
-      Striped<Lock> locks) {
+      Striped<Lock> locks,
+      MetricContext metricContext) {
     this.jobScheduler = jobScheduler;
     this.jobCatalog = jobCatalog;
     this.sysProps = sysProps;
@@ -132,6 +141,8 @@ class HelixRetriggeringJobCallable implements Callable {
     this.jobUri = 
jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI);
     this.jobsMapping = jobsMapping;
     this.locks = locks;
+    this.metricContext = metricContext;
+    eventSubmitter = new EventSubmitter.Builder(this.metricContext, 
"gobblin.runtime").build();
   }
 
   private boolean isRetriggeringEnabled() {
@@ -266,6 +277,10 @@ class HelixRetriggeringJobCallable implements Callable {
 
       try {
         if (planningJobIdFromStore.isPresent() && 
!canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
+          TimingEvent timer = new TimingEvent(eventSubmitter, 
TimingEvent.JOB_SKIPPED_TIME);
+          HashMap<String, String> metadata = new 
HashMap<>(Tag.toMap(Tag.tagValuesToString(
+              HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList()))));
+          timer.stop(metadata);
           planningJobLauncherMetrics.skippedPlanningJobs.mark();
           return;
         }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 80962b4..63149c4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -17,16 +17,23 @@
 
 package org.apache.gobblin.cluster;
 
+import com.google.common.collect.Lists;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -146,6 +153,59 @@ public class HelixUtils {
     log.info("Work flow {} initialized", workFlowName);
   }
 
+  /**
+   * Inject in some additional properties
+   * @param jobProps job properties
+   * @param inputTags list of metadata tags
+   * @return
+   */
+  public static List<? extends Tag<?>> initBaseEventTags(Properties jobProps,
+      List<? extends Tag<?>> inputTags) {
+    List<Tag<?>> metadataTags = Lists.newArrayList(inputTags);
+    String jobId;
+
+    // generate job id if not already set
+    if (jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)) {
+      jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);
+    } else {
+      jobId = 
JobLauncherUtils.newJobId(JobState.getJobNameFromProps(jobProps));
+      jobProps.put(ConfigurationKeys.JOB_ID_KEY, jobId);
+    }
+
+    String jobExecutionId = Long.toString(Id.Job.parse(jobId).getSequence());
+
+    // only inject flow tags if a flow name is defined
+    if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "")));
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
+
+      // use job execution id if flow execution id is not present
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobExecutionId)));
+    }
+
+    if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
+          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, 
"1")));
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
+          "false"));
+    }
+
+    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
+        jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
+    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "")));
+    metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
+
+    log.debug("HelixUtils.addAdditionalMetadataTags: metadataTags {}", 
metadataTags);
+
+    return metadataTags;
+  }
+
   protected static boolean deleteTaskFromHelixJob(String workFlowName,
       String jobName, String taskID, TaskDriver helixTaskDriver) {
     try {
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 50fb856..39da68b 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -101,6 +101,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
   public static final String METADATA_MESSAGE = "message";
   public static final String JOB_ORCHESTRATED_TIME = "jobOrchestratedTime";
   public static final String JOB_START_TIME = "jobStartTime";
+  public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
   public static final String JOB_END_TIME = "jobEndTime";
   public static final String JOB_LAST_PROGRESS_EVENT_TIME = 
"jobLastProgressEventTime";
   public static final String JOB_COMPLETION_PERCENTAGE = 
"jobCompletionPercentage";
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index fd8f157..926f1d8 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -303,6 +303,75 @@ public class KafkaAvroJobStatusMonitorTest {
   }
 
   @Test
+  public void testProcessMessageForSkippedFlow() throws IOException, 
ReflectiveOperationException {
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic2");
+
+    //Submit GobblinTrackingEvents to Kafka
+    ImmutableList.of(
+        createFlowCompiledEvent(),
+        createJobOrchestratedEvent(1),
+        createJobSkippedEvent()
+    ).forEach(event -> {
+      context.submitEvent(event);
+      kafkaReporter.report();
+    });
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    MockKafkaAvroJobStatusMonitor jobStatusMonitor = new 
MockKafkaAvroJobStatusMonitor("test",config, 1);
+    jobStatusMonitor.buildMetricsContextAndMetrics();
+
+    ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+    MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+    // Verify undecodeable message is skipped
+    byte[] undecodeableMessage = Arrays.copyOf(messageAndMetadata.message(),
+        messageAndMetadata.message().length - 1);
+    ConsumerRecord undecodeableRecord = new ConsumerRecord<>(TOPIC, 
messageAndMetadata.partition(),
+        messageAndMetadata.offset(), messageAndMetadata.key(), 
undecodeableMessage);
+    Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(), 
0L);
+    jobStatusMonitor.processMessage(new 
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
+    Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(), 
1L);
+    // Re-test when properly encoded, as expected for a normal event
+    
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+
+    StateStore stateStore = jobStatusMonitor.getStateStore();
+    String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, 
flowName);
+    String tableName = 
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+    List<State> stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    State state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.COMPILED.name());
+
+    Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+        iterator,
+        this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, 
this.jobGroup, this.jobName);
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.ORCHESTRATED.name());
+
+    jobStatusMonitor.processMessage(recordIterator.next());
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), 
ExecutionStatus.CANCELLED.name());
+  }
+
+  @Test
   public void testProcessingRetriedForApparentlyTransientErrors() throws 
IOException, ReflectiveOperationException {
     KafkaEventReporter kafkaReporter = builder.build("localhost:0000", 
"topic3");
 
@@ -433,6 +502,25 @@ public class KafkaAvroJobStatusMonitorTest {
     return event;
   }
 
+  private GobblinTrackingEvent createJobSkippedEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.JOB_SKIPPED_TIME;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
this.jobGroup);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, 
this.jobExecutionId);
+    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+    metadata.put(TimingEvent.METADATA_START_TIME, "5");
+    metadata.put(TimingEvent.METADATA_END_TIME, "6");
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, 
namespace, name, metadata);
+    return event;
+  }
+
   private GobblinTrackingEvent createJobSucceededEvent() {
     String namespace = "org.apache.gobblin.metrics";
     Long timestamp = System.currentTimeMillis();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index b46feba..9c01964 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -1048,6 +1048,8 @@ public class Task implements TaskIFace {
   public synchronized boolean cancel() {
     LOG.info("Calling task cancel with interrupt flag: {}", 
this.shouldInterruptTaskOnCancel);
     if (this.taskFuture != null && 
this.taskFuture.cancel(this.shouldInterruptTaskOnCancel)) {
+      // Make sure to mark running task as done
+      this.taskStateTracker.onTaskCommitCompletion(this);
       return true;
     } else {
       return false;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index dd6b343..4c7ff29 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -160,6 +160,7 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
         break;
       case TimingEvent.FlowTimings.FLOW_CANCELLED:
       case TimingEvent.LauncherTimings.JOB_CANCEL:
+      case TimingEvent.JOB_SKIPPED_TIME:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
         properties.put(TimingEvent.JOB_END_TIME, 
properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;

Reply via email to