This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8c9c8a8 [GOBBLIN-1621] Make HelixRetriggeringJobCallable emit job
skip event when job is dropped due to previous job is running (#3478)
8c9c8a8 is described below
commit 8c9c8a84ed23c0215c4d80125ac532e97085d76f
Author: Zihan Li <[email protected]>
AuthorDate: Thu Mar 17 14:45:24 2022 -0700
[GOBBLIN-1621] Make HelixRetriggeringJobCallable emit job skip event when
job is dropped due to previous job is running (#3478)
* [GOBBLIN-1621] Make HelixRetriggeringJobCallable emit job skip event when
job is dropped due to previous job is running
* address typo
* address comments
* fix checkStyle
* address comments
---
.../gobblin/cluster/GobblinHelixJobLauncher.java | 57 +-------------
.../gobblin/cluster/GobblinHelixJobScheduler.java | 6 +-
.../cluster/HelixRetriggeringJobCallable.java | 17 ++++-
.../org/apache/gobblin/cluster/HelixUtils.java | 60 +++++++++++++++
.../apache/gobblin/metrics/event/TimingEvent.java | 1 +
.../runtime/KafkaAvroJobStatusMonitorTest.java | 88 ++++++++++++++++++++++
.../main/java/org/apache/gobblin/runtime/Task.java | 2 +
.../monitoring/KafkaAvroJobStatusMonitor.java | 1 +
8 files changed, 173 insertions(+), 59 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 911ba2c..df15ee3 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -44,7 +44,6 @@ import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
@@ -73,7 +72,6 @@ import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.PropertiesUtils;
@@ -140,7 +138,7 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean>
runningMap,
Optional<GobblinHelixMetrics> helixMetrics) throws Exception {
- super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags));
+ super(jobProps, HelixUtils.initBaseEventTags(jobProps, metadataTags));
LOGGER.debug("GobblinHelixJobLauncher: jobProps {}, appWorkDir {}",
jobProps, appWorkDir);
this.helixManager = helixManager;
this.helixTaskDriver = new TaskDriver(this.helixManager);
@@ -574,57 +572,4 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
this.fs.delete(jobStateFilePath, false);
}
}
-
- /**
- * Inject in some additional properties
- * @param jobProps job properties
- * @param inputTags list of metadata tags
- * @return
- */
- private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties
jobProps,
- List<? extends Tag<?>> inputTags) {
- List<Tag<?>> metadataTags = Lists.newArrayList(inputTags);
- String jobId;
-
- // generate job id if not already set
- if (jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)) {
- jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);
- } else {
- jobId =
JobLauncherUtils.newJobId(JobState.getJobNameFromProps(jobProps));
- jobProps.put(ConfigurationKeys.JOB_ID_KEY, jobId);
- }
-
- String jobExecutionId = Long.toString(Id.Job.parse(jobId).getSequence());
-
- // only inject flow tags if a flow name is defined
- if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
- metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
- jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "")));
- metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
- jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
-
- // use job execution id if flow execution id is not present
- metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
- jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
jobExecutionId)));
- }
-
- if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
- metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
- metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION,
"1")));
- metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
- "false"));
- }
-
- metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
- metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
- jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "")));
- metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
-
- LOGGER.debug("GobblinHelixJobLauncher.addAdditionalMetadataTags:
metadataTags {}", metadataTags);
-
- return metadataTags;
- }
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index e26f5dd..7e43c16 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -227,7 +227,8 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
this.jobHelixManager,
this.taskDriverHelixManager,
this.jobsMapping,
- this.locks).call();
+ this.locks,
+ this.metricContext).call();
}
@Override
@@ -257,7 +258,8 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
this.jobHelixManager,
this.taskDriverHelixManager,
this.jobsMapping,
- this.locks);
+ this.locks,
+ this.metricContext);
final Future<?> future = this.jobExecutor.submit(retriggeringJob);
return new Future() {
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 735da83..37da2c0 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -17,14 +17,20 @@
package org.apache.gobblin.cluster;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
@@ -104,6 +110,8 @@ class HelixRetriggeringJobCallable implements Callable {
private final String jobUri;
private boolean jobDeleteAttempted = false;
private final Striped<Lock> locks;
+ private final MetricContext metricContext;
+ private final EventSubmitter eventSubmitter;
public HelixRetriggeringJobCallable(
GobblinHelixJobScheduler jobScheduler,
@@ -117,7 +125,8 @@ class HelixRetriggeringJobCallable implements Callable {
HelixManager jobHelixManager,
Optional<HelixManager> taskDriverHelixManager,
HelixJobsMapping jobsMapping,
- Striped<Lock> locks) {
+ Striped<Lock> locks,
+ MetricContext metricContext) {
this.jobScheduler = jobScheduler;
this.jobCatalog = jobCatalog;
this.sysProps = sysProps;
@@ -132,6 +141,8 @@ class HelixRetriggeringJobCallable implements Callable {
this.jobUri =
jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI);
this.jobsMapping = jobsMapping;
this.locks = locks;
+ this.metricContext = metricContext;
+ eventSubmitter = new EventSubmitter.Builder(this.metricContext,
"gobblin.runtime").build();
}
private boolean isRetriggeringEnabled() {
@@ -266,6 +277,10 @@ class HelixRetriggeringJobCallable implements Callable {
try {
if (planningJobIdFromStore.isPresent() &&
!canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
+ TimingEvent timer = new TimingEvent(eventSubmitter,
TimingEvent.JOB_SKIPPED_TIME);
+ HashMap<String, String> metadata = new
HashMap<>(Tag.toMap(Tag.tagValuesToString(
+ HelixUtils.initBaseEventTags(jobProps, Lists.newArrayList()))));
+ timer.stop(metadata);
planningJobLauncherMetrics.skippedPlanningJobs.mark();
return;
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 80962b4..63149c4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -17,16 +17,23 @@
package org.apache.gobblin.cluster;
+import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -146,6 +153,59 @@ public class HelixUtils {
log.info("Work flow {} initialized", workFlowName);
}
+ /**
+ * Inject in some additional properties
+ * @param jobProps job properties
+ * @param inputTags list of metadata tags
+ * @return
+ */
+ public static List<? extends Tag<?>> initBaseEventTags(Properties jobProps,
+ List<? extends Tag<?>> inputTags) {
+ List<Tag<?>> metadataTags = Lists.newArrayList(inputTags);
+ String jobId;
+
+ // generate job id if not already set
+ if (jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)) {
+ jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);
+ } else {
+ jobId =
JobLauncherUtils.newJobId(JobState.getJobNameFromProps(jobProps));
+ jobProps.put(ConfigurationKeys.JOB_ID_KEY, jobId);
+ }
+
+ String jobExecutionId = Long.toString(Id.Job.parse(jobId).getSequence());
+
+ // only inject flow tags if a flow name is defined
+ if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "")));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
+
+ // use job execution id if flow execution id is not present
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
jobExecutionId)));
+ }
+
+ if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION,
"1")));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
+ "false"));
+ }
+
+ metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
+ metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "")));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
+
+ log.debug("HelixUtils.addAdditionalMetadataTags: metadataTags {}",
metadataTags);
+
+ return metadataTags;
+ }
+
protected static boolean deleteTaskFromHelixJob(String workFlowName,
String jobName, String taskID, TaskDriver helixTaskDriver) {
try {
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 50fb856..39da68b 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -101,6 +101,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String METADATA_MESSAGE = "message";
public static final String JOB_ORCHESTRATED_TIME = "jobOrchestratedTime";
public static final String JOB_START_TIME = "jobStartTime";
+ public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
public static final String JOB_END_TIME = "jobEndTime";
public static final String JOB_LAST_PROGRESS_EVENT_TIME =
"jobLastProgressEventTime";
public static final String JOB_COMPLETION_PERCENTAGE =
"jobCompletionPercentage";
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index fd8f157..926f1d8 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -303,6 +303,75 @@ public class KafkaAvroJobStatusMonitorTest {
}
@Test
+ public void testProcessMessageForSkippedFlow() throws IOException,
ReflectiveOperationException {
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic2");
+
+ //Submit GobblinTrackingEvents to Kafka
+ ImmutableList.of(
+ createFlowCompiledEvent(),
+ createJobOrchestratedEvent(1),
+ createJobSkippedEvent()
+ ).forEach(event -> {
+ context.submitEvent(event);
+ kafkaReporter.report();
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ MockKafkaAvroJobStatusMonitor jobStatusMonitor = new
MockKafkaAvroJobStatusMonitor("test",config, 1);
+ jobStatusMonitor.buildMetricsContextAndMetrics();
+
+ ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+ MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+ // Verify undecodeable message is skipped
+ byte[] undecodeableMessage = Arrays.copyOf(messageAndMetadata.message(),
+ messageAndMetadata.message().length - 1);
+ ConsumerRecord undecodeableRecord = new ConsumerRecord<>(TOPIC,
messageAndMetadata.partition(),
+ messageAndMetadata.offset(), messageAndMetadata.key(),
undecodeableMessage);
+ Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
0L);
+ jobStatusMonitor.processMessage(new
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
+ Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
1L);
+ // Re-test when properly encoded, as expected for a normal event
+
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
+
+ StateStore stateStore = jobStatusMonitor.getStateStore();
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
+ String tableName =
KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId, "NA", "NA");
+ List<State> stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ State state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.COMPILED.name());
+
+ Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
+ iterator,
+ this::convertMessageAndMetadataToDecodableKafkaRecord);
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ tableName = KafkaJobStatusMonitor.jobStatusTableName(this.flowExecutionId,
this.jobGroup, this.jobName);
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.ORCHESTRATED.name());
+
+ jobStatusMonitor.processMessage(recordIterator.next());
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD),
ExecutionStatus.CANCELLED.name());
+ }
+
+ @Test
public void testProcessingRetriedForApparentlyTransientErrors() throws
IOException, ReflectiveOperationException {
KafkaEventReporter kafkaReporter = builder.build("localhost:0000",
"topic3");
@@ -433,6 +502,25 @@ public class KafkaAvroJobStatusMonitorTest {
return event;
}
+ private GobblinTrackingEvent createJobSkippedEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = TimingEvent.JOB_SKIPPED_TIME;
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
this.flowGroup);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
this.flowName);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
this.flowExecutionId);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
this.jobGroup);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD,
this.jobExecutionId);
+ metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+ metadata.put(TimingEvent.METADATA_START_TIME, "5");
+ metadata.put(TimingEvent.METADATA_END_TIME, "6");
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp,
namespace, name, metadata);
+ return event;
+ }
+
private GobblinTrackingEvent createJobSucceededEvent() {
String namespace = "org.apache.gobblin.metrics";
Long timestamp = System.currentTimeMillis();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index b46feba..9c01964 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -1048,6 +1048,8 @@ public class Task implements TaskIFace {
public synchronized boolean cancel() {
LOG.info("Calling task cancel with interrupt flag: {}",
this.shouldInterruptTaskOnCancel);
if (this.taskFuture != null &&
this.taskFuture.cancel(this.shouldInterruptTaskOnCancel)) {
+ // Make sure to mark running task as done
+ this.taskStateTracker.onTaskCommitCompletion(this);
return true;
} else {
return false;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index dd6b343..4c7ff29 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -160,6 +160,7 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
break;
case TimingEvent.FlowTimings.FLOW_CANCELLED:
case TimingEvent.LauncherTimings.JOB_CANCEL:
+ case TimingEvent.JOB_SKIPPED_TIME:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
properties.put(TimingEvent.JOB_END_TIME,
properties.getProperty(TimingEvent.METADATA_END_TIME));
break;