This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e95bd3  [GOBBLIN-677] Allow early termination of Gobblin jobs based 
on a predicate on the job progress
9e95bd3 is described below

commit 9e95bd32d6115175b532dbbd9f7e73bae8059c02
Author: ibuenros <[email protected]>
AuthorDate: Mon Mar 4 10:08:31 2019 -0800

    [GOBBLIN-677] Allow early termination of Gobblin jobs based on a predicate 
on the job progress
    
    Closes #2548 from ibuenros/early-termination-2
---
 .../gobblin/runtime/JobShutdownException.java      |  27 ++
 .../apache/gobblin/source/extractor/Extractor.java |  31 +-
 .../runtime/embedded/EmbeddedGobblinDistcp.java    |   2 +-
 .../embedded/EmbeddedGobblinDistcpTest.java        |   1 +
 .../extractor/extract/kafka/KafkaExtractor.java    |  14 +
 gobblin-runtime/build.gradle                       |   5 +-
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   |  42 +-
 .../java/org/apache/gobblin/runtime/JobState.java  |  23 +-
 .../main/java/org/apache/gobblin/runtime/Task.java |   8 +-
 .../java/org/apache/gobblin/runtime/TaskState.java |   3 +-
 .../gobblin/runtime/embedded/EmbeddedGobblin.java  |   7 +-
 .../runtime/job/GobblinJobFiniteStateMachine.java  | 180 ++++++++
 .../runtime/job/JobInterruptionPredicate.java      | 111 +++++
 .../apache/gobblin/runtime/job/JobProgress.java    |  38 ++
 .../apache/gobblin/runtime/job/TaskProgress.java   |  33 ++
 .../gobblin/runtime/local/LocalJobLauncher.java    |   5 +
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 126 ++++--
 .../spec_catalog/SpecCatalogListenersList.java     |  34 --
 .../gobblin/util/ReflectivePredicateEvaluator.java | 370 ++++++++++++++++
 .../runtime/job/JobInterruptionPredicateTest.java  | 133 ++++++
 .../util/ReflectivePredicateEvaluatorTest.java     | 125 ++++++
 .../org/apache/gobblin/fsm/FiniteStateMachine.java | 463 +++++++++++++++++++++
 .../org/apache/gobblin/fsm/StateWithCallbacks.java |  45 ++
 .../apache/gobblin/fsm/FiniteStateMachineTest.java | 344 +++++++++++++++
 gradle/scripts/defaultBuildProperties.gradle       |   2 +-
 gradle/scripts/dependencyDefinitions.gradle        |   4 +-
 gradle/scripts/globalDependencies.gradle           |   1 +
 27 files changed, 2095 insertions(+), 82 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
new file mode 100644
index 0000000..2c7d55a
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+/**
+ * An exception thrown when a job cannot be graciously shutdown.
+ */
+public class JobShutdownException extends Exception {
+       public JobShutdownException(String message) {
+               super(message);
+       }
+}
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java 
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
index 9749795..2ea900b 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
@@ -23,8 +23,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.gobblin.metadata.GlobalMetadata;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.runtime.JobShutdownException;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.stream.StreamEntity;
+import org.apache.gobblin.util.Decorator;
 
 import edu.umd.cs.findbugs.annotations.SuppressWarnings;
 import io.reactivex.Emitter;
@@ -54,7 +56,7 @@ public interface Extractor<S, D> extends Closeable {
    * @return schema of the extracted data records
    * @throws java.io.IOException if there is problem getting the schema
    */
-  public S getSchema() throws IOException;
+  S getSchema() throws IOException;
 
   /**
    * Read the next data record from the data source.
@@ -78,7 +80,7 @@ public interface Extractor<S, D> extends Closeable {
    *
    * @return the expected source record count
    */
-  public long getExpectedRecordCount();
+  long getExpectedRecordCount();
 
   /**
    * Get the calculated high watermark up to which data records are to be 
extracted.
@@ -88,7 +90,23 @@ public interface Extractor<S, D> extends Closeable {
    * <a 
href="https://github.com/linkedin/gobblin/wiki/Watermarks";>Watermarks</a> for 
more information.
    */
   @Deprecated
-  public long getHighWatermark();
+  long getHighWatermark();
+
+  /**
+   * Called to notify the Extractor it should shut down as soon as possible. 
If this call returns successfully, the task
+   * will continue consuming records from the Extractor and continue execution 
normally. The extractor should only emit
+   * those records necessary to stop at a graceful committable state. Most job 
executors will eventually kill the task
+   * if the Extractor does not stop emitting records after a few seconds.
+   *
+   * @throws JobShutdownException if the extractor does not support early 
termination. This will cause the task to fail.
+   */
+  default void shutdown() throws JobShutdownException {
+    if (this instanceof Decorator && ((Decorator) this).getDecoratedObject() 
instanceof Extractor) {
+      ((Extractor) ((Decorator) this).getDecoratedObject()).shutdown();
+    } else {
+      throw new JobShutdownException(this.getClass().getName() + ": Extractor 
does not support shutdown.");
+    }
+  }
 
   /**
    * Read an {@link RecordEnvelope}. By default, just wrap {@link 
#readRecord(Object)} in a {@link RecordEnvelope}.
@@ -115,7 +133,12 @@ public interface Extractor<S, D> extends Closeable {
     S schema = getSchema();
     Flowable<StreamEntity<D>> recordStream = Flowable.generate(() -> 
shutdownRequest, (BiConsumer<AtomicBoolean, Emitter<StreamEntity<D>>>) (state, 
emitter) -> {
       if (state.get()) {
-        emitter.onComplete();
+        // shutdown requested
+        try {
+          shutdown();
+        } catch (JobShutdownException exc) {
+          emitter.onError(exc);
+        }
       }
       try {
         StreamEntity<D> record = readStreamEntity();
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
index e52748f..f7076bd 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
@@ -82,7 +82,7 @@ public class EmbeddedGobblinDistcp extends EmbeddedGobblin {
     this.setConfiguration(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
to.getFileSystem(new Configuration()).getUri().toString());
 
     // add gobblin-data-management jar to distributed jars
-    this.distributeJar(ClassUtil.findContainingJar(CopySource.class));
+    this.distributeJarByClassWithPriority(CopySource.class, 0);
   }
 
   /**
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 3b289c0..c12dea4 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index e54a4b6..0a4685f 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -23,7 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.gobblin.runtime.JobShutdownException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,6 +114,8 @@ public abstract class KafkaExtractor<S, D> extends 
EventBasedExtractor<S, D> {
   private long currentPartitionReadRecordTime = 0;
   protected D currentPartitionLastSuccessfulRecord = null;
 
+  private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
   public KafkaExtractor(WorkUnitState state) {
     super(state);
     this.workUnitState = state;
@@ -164,6 +168,10 @@ public abstract class KafkaExtractor<S, D> extends 
EventBasedExtractor<S, D> {
   @SuppressWarnings("unchecked")
   @Override
   public D readRecordImpl(D reuse) throws DataRecordException, IOException {
+    if (this.shutdownRequested.get()) {
+      return null;
+    }
+
     long readStartTime = System.nanoTime();
 
     while (!allPartitionsFinished()) {
@@ -245,6 +253,12 @@ public abstract class KafkaExtractor<S, D> extends 
EventBasedExtractor<S, D> {
     return null;
   }
 
+  @Override
+  public void shutdown()
+      throws JobShutdownException {
+    this.shutdownRequested.set(true);
+  }
+
   private boolean allPartitionsFinished() {
     return this.currentPartitionIdx != INITIAL_PARTITION_IDX && 
this.currentPartitionIdx >= this.highWatermark.size();
   }
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 88e0c10..71dd510 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -49,6 +49,8 @@ dependencies {
 
   compile externalDependency.avro
   compile externalDependency.avroMapredH2
+  compile externalDependency.calciteCore
+  //compile externalDependency.calciteAvatica
   compile externalDependency.commonsCli
   compile externalDependency.commonsConfiguration
   compile externalDependency.commonsEmail
@@ -83,8 +85,6 @@ dependencies {
   compile externalDependency.kryo
 
   testCompile project(path: ":gobblin-metastore", configuration: 
"testFixtures")
-  testCompile externalDependency.calciteCore
-  testCompile externalDependency.calciteAvatica
   testCompile externalDependency.jhyde
   testCompile externalDependency.testng
   testCompile externalDependency.hamcrest
@@ -150,6 +150,7 @@ jmh {
 }
 
 test {
+    systemProperty "org.jboss.byteman.verbose", "true"
     workingDir rootProject.rootDir
     maxParallelForks = 1
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index e800107..ccc94cb 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -28,8 +28,8 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +60,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
 import javax.annotation.Nullable;
+import lombok.Setter;
 
 
 /**
@@ -93,6 +94,8 @@ public class GobblinMultiTaskAttempt {
   private final Optional<String> containerIdOptional;
   private final Optional<StateStore<TaskState>> taskStateStoreOptional;
   private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
+  @Setter
+  private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = (gmta) -> 
false;
   private List<Task> tasks;
 
   /**
@@ -139,16 +142,39 @@ public class GobblinMultiTaskAttempt {
     this.tasks = runWorkUnits(countDownLatch);
     log.info("Waiting for submitted tasks of job {} to complete in container 
{}...", jobId,
         containerIdOptional.or(""));
-    while (countDownLatch.getCount() > 0) {
-      log.info(String.format("%d out of %d tasks of job %s are running in 
container %s", countDownLatch.getCount(),
-          countDownLatch.getRegisteredParties(), jobId, 
containerIdOptional.or("")));
-      if (countDownLatch.await(10, TimeUnit.SECONDS)) {
-        break;
+    try {
+      while (countDownLatch.getCount() > 0) {
+        if (this.interruptionPredicate.test(this)) {
+          log.info("Interrupting task execution due to satisfied predicate.");
+          interruptTaskExecution(countDownLatch);
+          break;
+        }
+        log.info(String.format("%d out of %d tasks of job %s are running in 
container %s", countDownLatch.getCount(),
+            countDownLatch.getRegisteredParties(), jobId, 
containerIdOptional.or("")));
+        if (countDownLatch.await(10, TimeUnit.SECONDS)) {
+          break;
+        }
       }
+    } catch (InterruptedException interrupt) {
+      log.info("Job interrupted by InterrupedException.");
+      interruptTaskExecution(countDownLatch);
     }
     log.info("All assigned tasks of job {} have completed in container {}", 
jobId, containerIdOptional.or(""));
   }
 
+  private void interruptTaskExecution(CountDownLatch countDownLatch) throws 
InterruptedException {
+    log.info("Job interrupted. Attempting a graceful shutdown of the job.");
+    this.tasks.forEach(Task::shutdown);
+    if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
+      log.warn("Graceful shutdown of job timed out. Killing all outstanding 
tasks.");
+      try {
+        this.taskExecutor.shutDown();
+      } catch (Throwable t) {
+        throw new RuntimeException("Failed to shutdown task executor.", t);
+      }
+    }
+  }
+
   /**
    * Commit {@link #tasks} by 1. calling {@link Task#commit()} in parallel; 2. 
executing any additional {@link CommitStep};
    * 3. persist task statestore.
@@ -468,7 +494,8 @@ public class GobblinMultiTaskAttempt {
   public static GobblinMultiTaskAttempt runWorkUnits(String jobId, String 
containerId, JobState jobState,
       List<WorkUnit> workUnits, TaskStateTracker taskStateTracker, 
TaskExecutor taskExecutor,
       StateStore<TaskState> taskStateStore,
-      CommitPolicy multiTaskAttemptCommitPolicy, 
SharedResourcesBroker<GobblinScopeTypes> jobBroker)
+      CommitPolicy multiTaskAttemptCommitPolicy, 
SharedResourcesBroker<GobblinScopeTypes> jobBroker,
+      Predicate<GobblinMultiTaskAttempt> interruptionPredicate)
       throws IOException, InterruptedException {
 
     // dump the work unit if tracking logs are enabled
@@ -480,6 +507,7 @@ public class GobblinMultiTaskAttempt {
     GobblinMultiTaskAttempt multiTaskAttempt =
         new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState, 
taskStateTracker, taskExecutor,
             Optional.of(containerId), Optional.of(taskStateStore), jobBroker);
+    multiTaskAttempt.setInterruptionPredicate(interruptionPredicate);
 
     
multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
     return multiTaskAttempt;
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index c0f9271..45d3876 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.job.JobProgress;
+import org.apache.gobblin.runtime.job.TaskProgress;
 import org.apache.hadoop.io.Text;
 
 import com.codahale.metrics.Counter;
@@ -67,7 +69,7 @@ import org.apache.gobblin.util.ImmutableProperties;
  *
  * @author Yinan Li
  */
-public class JobState extends SourceState {
+public class JobState extends SourceState implements JobProgress {
 
   /**
    * An enumeration of possible job states, which are identical to
@@ -248,6 +250,20 @@ public class JobState extends SourceState {
   }
 
   /**
+   * Get the currently elapsed time for this job.
+   * @return
+   */
+  public long getElapsedTime() {
+    if (this.endTime > 0) {
+      return  this.endTime - this.startTime;
+    }
+    if (this.startTime > 0) {
+      return System.currentTimeMillis() - this.startTime;
+    }
+    return 0;
+  }
+
+  /**
    * Set job end time.
    *
    * @param endTime job end time
@@ -393,6 +409,11 @@ public class JobState extends SourceState {
     return 
ImmutableList.<TaskState>builder().addAll(this.taskStates.values()).build();
   }
 
+  @Override
+  public List<TaskState> getTaskProgress() {
+    return getTaskStates();
+  }
+
   /**
    * Create a {@link Map} from dataset URNs (as being specified by {@link 
ConfigurationKeys#DATASET_URN_KEY} to
    * {@link DatasetState} objects that represent the dataset states and store 
{@link TaskState}s corresponding
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 049a3ff..8ddd1d4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -462,7 +462,7 @@ public class Task implements TaskIFace {
 
       RecordEnvelope recordEnvelope;
       // Extract, convert, and fork one source record at a time.
-      while (!shutdownRequested() && (recordEnvelope = 
extractor.readRecordEnvelope()) != null) {
+      while ((recordEnvelope = extractor.readRecordEnvelope()) != null) {
         onRecordExtract();
         AcknowledgableWatermark ackableWatermark = new 
AcknowledgableWatermark(recordEnvelope.getWatermark());
         if (watermarkTracker.isPresent()) {
@@ -473,6 +473,9 @@ public class Task implements TaskIFace {
               ackableWatermark.incrementAck());
         }
         ackableWatermark.ack();
+        if (shutdownRequested()) {
+          extractor.shutdown();
+        }
       }
     } else {
       RecordEnvelope record;
@@ -495,6 +498,9 @@ public class Task implements TaskIFace {
             throw new RuntimeException(e);
           }
         }
+        if (shutdownRequested()) {
+          extractor.shutdown();
+        }
       }
     }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index a9d99be..50bba37 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.gobblin.runtime.job.TaskProgress;
 import org.apache.hadoop.io.Text;
 
 import com.codahale.metrics.Counter;
@@ -59,7 +60,7 @@ import lombok.Getter;
  *
  * @author Yinan Li
  */
-public class TaskState extends WorkUnitState {
+public class TaskState extends WorkUnitState implements TaskProgress {
 
   // Built-in metric names
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index b6cc3b5..dced320 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -200,7 +200,12 @@ public class EmbeddedGobblin {
    * will appear first in the classpath. Default priority is 0.
    */
   public EmbeddedGobblin distributeJarByClassWithPriority(Class<?> klazz, int 
priority) {
-    return distributeJarWithPriority(ClassUtil.findContainingJar(klazz), 
priority);
+    String jar = ClassUtil.findContainingJar(klazz);
+    if (jar == null) {
+      log.warn(String.format("Could not find jar for class %s. This is normal 
in test runs.", klazz));
+      return this;
+    }
+    return distributeJarWithPriority(jar, priority);
   }
 
   /**
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
new file mode 100644
index 0000000..d36f68c
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.io.IOException;
+
+import org.apache.gobblin.fsm.FiniteStateMachine;
+import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.runtime.JobState;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link FiniteStateMachine} implementation to track the state of a Gobblin 
job executor.
+ */
+@Slf4j
+public class GobblinJobFiniteStateMachine extends 
FiniteStateMachine<GobblinJobFiniteStateMachine.JobFSMState> {
+
+       /**
+        * Types of state the job can be in.
+        */
+       public enum StateType {
+               PREPARING, RUNNING, INTERRUPTED, CANCELLED, SUCCESS, FAILED
+       }
+
+       /**
+        * State of a job.
+        */
+       @AllArgsConstructor(access = AccessLevel.PRIVATE)
+       @EqualsAndHashCode(of = "stateType")
+       @ToString
+       @Getter
+       public static class JobFSMState {
+               private final StateType stateType;
+       }
+
+       /**
+        * A special {@link JobFSMState} that is aware of how to interrupt a 
running job.
+        */
+       private class RunnableState extends JobFSMState implements 
StateWithCallbacks<JobFSMState> {
+               private final JobInterruptionPredicate jobInterruptionPredicate;
+
+               public RunnableState() {
+                       super(StateType.RUNNING);
+                       if 
(GobblinJobFiniteStateMachine.this.interruptGracefully == null) {
+                               this.jobInterruptionPredicate = null;
+                       } else {
+                               this.jobInterruptionPredicate = new 
JobInterruptionPredicate(GobblinJobFiniteStateMachine.this.jobState,
+                                               
GobblinJobFiniteStateMachine.this::interruptRunningJob, false);
+                       }
+               }
+
+               @Override
+               public void onEnterState(@Nullable JobFSMState previousState) {
+                       if (this.jobInterruptionPredicate != null) {
+                               this.jobInterruptionPredicate.startAsync();
+                       }
+               }
+
+               @Override
+               public void onLeaveState(JobFSMState nextState) {
+                       if (this.jobInterruptionPredicate != null) {
+                               this.jobInterruptionPredicate.stopAsync();
+                       }
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       return super.equals(o);
+               }
+
+               @Override
+               public int hashCode() {
+                       return super.hashCode();
+               }
+       }
+
+       /**
+        * A runnable that allows for {@link IOException}s.
+        */
+       @FunctionalInterface
+       public interface RunnableWithIoException {
+               void run() throws IOException;
+       }
+
+       private final JobState jobState;
+       private final RunnableWithIoException interruptGracefully;
+       private final RunnableWithIoException killJob;
+
+       @lombok.Builder
+       private GobblinJobFiniteStateMachine(JobState jobState, 
RunnableWithIoException interruptGracefully,
+                       RunnableWithIoException killJob) {
+               super(buildAllowedTransitions(), Sets.newHashSet(new 
JobFSMState(StateType.CANCELLED)), new JobFSMState(StateType.FAILED),
+                               new JobFSMState(StateType.PREPARING));
+
+               if (jobState == null) {
+                       throw new IllegalArgumentException("Job state is 
required.");
+               }
+
+               this.jobState = jobState;
+               this.interruptGracefully = interruptGracefully;
+               this.killJob = killJob;
+       }
+
+       /**
+        * Callers should use this method to obtain the {@link JobFSMState} for 
a particular {@link StateType}, as the
+        * {@link JobFSMState} might contain additional functionality like 
running other services, etc.
+        * @param stateType
+        * @return
+        */
+       public JobFSMState getEndStateForType(StateType stateType) {
+               switch (stateType) {
+                       case RUNNING:
+                               return new RunnableState();
+                       default:
+                               return new JobFSMState(stateType);
+               }
+       }
+
+       private void interruptRunningJob() {
+               log.info("Interrupting job execution.");
+               try (FiniteStateMachine<JobFSMState>.Transition transition = 
startTransition(getEndStateForType(StateType.INTERRUPTED))) {
+                               try {
+                                       this.interruptGracefully.run();
+                               } catch (IOException ioe) {
+                                       
transition.changeEndState(getEndStateForType(StateType.FAILED));
+                               }
+               } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+                       log.error("Cannot interrupt job.", exc);
+               } catch (InterruptedException | 
FailedTransitionCallbackException exc) {
+                       log.error("Cannot finish graceful job interruption. 
Killing job.", exc);
+                       try {
+                               this.killJob.run();
+                       } catch (IOException ioe) {
+                               log.error("Failed to kill job.", ioe);
+                       }
+                       if (exc instanceof FailedTransitionCallbackException) {
+                               ((FailedTransitionCallbackException) 
exc).getTransition().switchEndStateToErrorState();
+                               ((FailedTransitionCallbackException) 
exc).getTransition().closeWithoutCallbacks();
+                       }
+               }
+       }
+
+       private static SetMultimap<JobFSMState, JobFSMState> 
buildAllowedTransitions() {
+               SetMultimap<JobFSMState, JobFSMState> transitions = 
HashMultimap.create();
+               transitions.put(new JobFSMState(StateType.PREPARING), new 
JobFSMState(StateType.RUNNING));
+               transitions.put(new JobFSMState(StateType.PREPARING), new 
JobFSMState(StateType.FAILED));
+               transitions.put(new JobFSMState(StateType.PREPARING), new 
JobFSMState(StateType.INTERRUPTED));
+               transitions.put(new JobFSMState(StateType.RUNNING), new 
JobFSMState(StateType.SUCCESS));
+               transitions.put(new JobFSMState(StateType.RUNNING), new 
JobFSMState(StateType.FAILED));
+               transitions.put(new JobFSMState(StateType.RUNNING), new 
JobFSMState(StateType.INTERRUPTED));
+               return transitions;
+       }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
new file mode 100644
index 0000000..6f67657
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.util.ReflectivePredicateEvaluator;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * This class evaluates a predicate on the {@link JobProgress} of a job and 
calls a job interruption hook when the
+ * predicate is satisfied.
+ *
+ * It is used to preemptively stop jobs after they satisfy some completion 
predicate (e.g. more than 15 minutes have
+ * elapsed and at least 75% of tasks have finished).
+ */
+@Slf4j
+public class JobInterruptionPredicate extends AbstractScheduledService {
+
+       public static final String INTERRUPTION_SQL = 
"org.apache.gobblin.jobInterruptionPredicate.sql";
+
+       private final String sql;
+       private final ReflectivePredicateEvaluator evaluator;
+       private final JobProgress jobProgress;
+       private final Runnable jobInterruptionHook;
+
+       public JobInterruptionPredicate(JobState jobState, Runnable 
jobInterruptionHook, boolean autoStart) {
+               this(jobState, jobState.getProp(INTERRUPTION_SQL), 
jobInterruptionHook, autoStart);
+       }
+
+       protected JobInterruptionPredicate(JobProgress jobProgress, String 
predicate,
+                       Runnable jobInterruptionHook, boolean autoStart) {
+               this.sql = predicate;
+
+               ReflectivePredicateEvaluator tmpEval = null;
+               if (this.sql != null) {
+                       try {
+                               tmpEval = new 
ReflectivePredicateEvaluator(this.sql, JobProgress.class, TaskProgress.class);
+                       } catch (SQLException exc) {
+                               log.warn("Job interruption predicate is 
invalid, will not preemptively interrupt job.", exc);
+                       }
+               }
+               this.evaluator = tmpEval;
+               this.jobProgress = jobProgress;
+               this.jobInterruptionHook = jobInterruptionHook;
+
+               if (autoStart && this.sql != null) {
+                       startAsync();
+               }
+       }
+
+       @Override
+       protected void runOneIteration() {
+               if (this.evaluator == null) {
+                       stopAsync();
+                       return;
+               }
+               switch (this.jobProgress.getState()) {
+                       case PENDING:
+                               return;
+                       case RUNNING:
+                               try {
+                                       List<Object> objects = 
Stream.concat(Stream.<Object>of(this.jobProgress), 
this.jobProgress.getTaskProgress().stream()).collect(
+                                                       Collectors.toList());
+                                       if (this.evaluator.evaluate(objects)) {
+                                               log.info("Interrupting job due 
to satisfied job interruption predicate. Predicate: " + this.sql);
+                                               this.jobInterruptionHook.run();
+                                               stopAsync();
+                                       }
+                               } catch (Throwable exc) {
+                                       log.warn("Failed to evaluate job 
interruption predicate. Will not preemptively interrupt job.", exc);
+                                       throw Throwables.propagate(exc);
+                               }
+                               break;
+                       default:
+                               log.info(String.format("Detected job finished 
with state %s. Stopping job interruption predicate.",
+                                               this.jobProgress.getState()));
+                               stopAsync();
+               }
+       }
+
+       @Override
+       protected Scheduler scheduler() {
+               return Scheduler.newFixedDelaySchedule(30, 30, 
TimeUnit.SECONDS);
+       }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
new file mode 100644
index 0000000..ad1308a
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.util.List;
+
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * Type used to retrieve the progress of a Gobblin job.
+ */
+public interface JobProgress {
+
+       String getJobId();
+       int getTaskCount();
+       int getCompletedTasks();
+       long getElapsedTime();
+       JobState.RunningState getState();
+
+       List<? extends TaskProgress> getTaskProgress();
+
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
new file mode 100644
index 0000000..1ec8d61
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+/**
+ * Interface used to retrieve the progress of a task in a Gobblin job.
+ */
+public interface TaskProgress {
+
+       String getJobId();
+       String getTaskId();
+       WorkUnitState.WorkingState getWorkingState();
+       boolean isCompleted();
+
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
index fc0e197..6b6bc16 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.runtime.job.JobInterruptionPredicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,8 +144,12 @@ public class LocalJobLauncher extends AbstractJobLauncher {
       }
     });
 
+    Thread thisThread = Thread.currentThread();
+    JobInterruptionPredicate jobInterruptionPredicate =
+        new JobInterruptionPredicate(jobState, () -> thisThread.interrupt(), 
true);
     GobblinMultiTaskAttempt.runWorkUnits(this.jobContext, 
workUnitsWithJobState, this.taskStateTracker,
         this.taskExecutor, GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+    jobInterruptionPredicate.stopAsync();
 
     if (this.cancellationRequested) {
       // Wait for the cancellation execution if it has been requested
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 1f53054..529fe11 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -27,6 +27,9 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.fsm.FiniteStateMachine;
+import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -82,6 +85,8 @@ import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateCollectorService;
 import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.JobFSMState;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.StateType;
 import org.apache.gobblin.runtime.util.JobMetrics;
 import org.apache.gobblin.runtime.util.MetricGroup;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -110,6 +115,9 @@ import org.apache.gobblin.util.SerializationUtils;
  */
 public class MRJobLauncher extends AbstractJobLauncher {
 
+  private static final String INTERRUPT_JOB_FILE_NAME = "_INTERRUPT_JOB";
+  private static final String GOBBLIN_JOB_INTERRUPT_PATH_KEY = 
"gobblin.jobInterruptPath";
+
   private static final Logger LOG = 
LoggerFactory.getLogger(MRJobLauncher.class);
 
   private static final String JOB_NAME_PREFIX = "Gobblin-";
@@ -148,6 +156,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
   private final StateStore<TaskState> taskStateStore;
 
   private final int jarFileMaximumRetry;
+  private final Path interruptPath;
+  private final GobblinJobFiniteStateMachine fsm;
 
   public MRJobLauncher(Properties jobProps) throws Exception {
     this(jobProps, null);
@@ -170,6 +180,9 @@ public class MRJobLauncher extends AbstractJobLauncher {
       throws Exception {
     super(jobProps, metadataTags);
 
+    this.fsm = 
GobblinJobFiniteStateMachine.builder().jobState(jobContext.getJobState())
+        
.interruptGracefully(this::interruptGracefully).killJob(this::killJob).build();
+
     this.conf = conf;
     // Put job configuration properties into the Hadoop configuration so they 
are available in the mappers
     JobConfigurationUtils.putPropertiesIntoConfiguration(this.jobProps, 
this.conf);
@@ -185,6 +198,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     this.mrJobDir = new Path(
         new 
Path(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), 
this.jobContext.getJobName()),
         this.jobContext.getJobId());
+    this.interruptPath = new Path(this.mrJobDir, INTERRUPT_JOB_FILE_NAME);
     if (this.fs.exists(this.mrJobDir)) {
       LOG.warn("Job working directory already exists for job " + 
this.jobContext.getJobName());
       this.fs.delete(this.mrJobDir, true);
@@ -252,26 +266,35 @@ public class MRJobLauncher extends AbstractJobLauncher {
       this.taskStateCollectorService.startAsync().awaitRunning();
 
       LOG.info("Launching Hadoop MR job " + this.job.getJobName());
-      this.job.submit();
-      this.hadoopJobSubmitted = true;
+      try (FiniteStateMachine<JobFSMState>.Transition t = 
this.fsm.startTransition(this.fsm.getEndStateForType(StateType.RUNNING))) {
+        try {
+          this.job.submit();
+        } catch (Throwable exc) {
+          t.changeEndState(this.fsm.getEndStateForType(StateType.FAILED));
+          throw exc;
+        }
+        this.hadoopJobSubmitted = true;
 
-      // Set job tracking URL to the Hadoop job tracking URL if it is not set 
yet
-      if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
-        jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, 
this.job.getTrackingURL());
+        // Set job tracking URL to the Hadoop job tracking URL if it is not 
set yet
+        if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
+          jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, 
this.job.getTrackingURL());
+        }
+      } catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
+        LOG.error("Cannot start MR job.", unallowed);
       }
 
-      TimingEvent mrJobRunTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
-      LOG.info(String.format("Waiting for Hadoop MR job %s to complete", 
this.job.getJobID()));
-      this.job.waitForCompletion(true);
-      mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId", 
this.job.getJobID().toString()));
+      if (this.fsm.getCurrentState().getStateType().equals(StateType.RUNNING)) 
{
+        TimingEvent mrJobRunTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
+        LOG.info(String.format("Waiting for Hadoop MR job %s to complete", 
this.job.getJobID()));
 
-      if (this.cancellationRequested) {
-        // Wait for the cancellation execution if it has been requested
-        synchronized (this.cancellationExecution) {
-          if (this.cancellationExecuted) {
-            return;
-          }
-        }
+        this.job.waitForCompletion(true);
+        
this.fsm.transitionIfAllowed(fsm.getEndStateForType(StateType.SUCCESS));
+
+        mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId", 
this.job.getJobID().toString()));
+      }
+
+      if 
(this.fsm.getCurrentState().getStateType().equals(StateType.CANCELLED)) {
+        return;
       }
 
       // Create a metrics set for this job run from the Hadoop counters.
@@ -286,18 +309,52 @@ public class MRJobLauncher extends AbstractJobLauncher {
 
   @Override
   protected void executeCancellation() {
-    try {
-      if (this.hadoopJobSubmitted && !this.job.isComplete()) {
-        LOG.info("Killing the Hadoop MR job for job " + 
this.jobContext.getJobId());
-        this.job.killJob();
-        // Collect final task states.
-        this.taskStateCollectorService.stopAsync().awaitTerminated();
+    try (FiniteStateMachine<JobFSMState>.Transition transition =
+        
this.fsm.startTransition(this.fsm.getEndStateForType(StateType.CANCELLED))) {
+      if (transition.getStartState().getStateType().equals(StateType.RUNNING)) 
{
+        try {
+          killJob();
+        } catch (IOException ioe) {
+          LOG.error("Failed to kill the Hadoop MR job for job " + 
this.jobContext.getJobId());
+          
transition.changeEndState(this.fsm.getEndStateForType(StateType.FAILED));
+        }
+      }
+    } catch (GobblinJobFiniteStateMachine.FailedTransitionCallbackException 
exc) {
+      exc.getTransition().switchEndStateToErrorState();
+      exc.getTransition().closeWithoutCallbacks();
+    } catch (FiniteStateMachine.UnallowedTransitionException | 
InterruptedException exc) {
+      LOG.error("Failed to cancel job " + this.jobContext.getJobId(), exc);
+    }
+  }
+
+  /**
+   * Attempt a gracious interruption of the running job
+   */
+  private void interruptGracefully() throws IOException {
+    LOG.info("Attempting graceful interruption of job " + 
this.jobContext.getJobId());
+
+    this.fs.createNewFile(this.interruptPath);
+
+    long waitTimeStart = System.currentTimeMillis();
+    while (!this.job.isComplete() && System.currentTimeMillis() < 
waitTimeStart + 30 * 1000) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        break;
       }
-    } catch (IllegalStateException ise) {
-      LOG.error("The Hadoop MR job has not started for job " + 
this.jobContext.getJobId());
-    } catch (IOException ioe) {
-      LOG.error("Failed to kill the Hadoop MR job for job " + 
this.jobContext.getJobId());
     }
+
+    if (!this.job.isComplete()) {
+      LOG.info("Interrupted job did not shut itself down after timeout. 
Killing job.");
+      this.job.killJob();
+    }
+  }
+
+  private void killJob() throws IOException {
+    LOG.info("Killing the Hadoop MR job for job " + 
this.jobContext.getJobId());
+    this.job.killJob();
+    // Collect final task states.
+    this.taskStateCollectorService.stopAsync().awaitTerminated();
   }
 
   /**
@@ -381,6 +438,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
           
Integer.parseInt(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)));
     }
 
+    this.job.getConfiguration().set(GOBBLIN_JOB_INTERRUPT_PATH_KEY, 
this.interruptPath.toString());
+
     mrJobSetupTimer.stop();
   }
 
@@ -679,6 +738,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
     @Override
     public void run(Context context) throws IOException, InterruptedException {
       this.setup(context);
+
+      Path interruptPath = new 
Path(context.getConfiguration().get(GOBBLIN_JOB_INTERRUPT_PATH_KEY));
+      if (this.fs.exists(interruptPath)) {
+        LOG.info(String.format("Found interrupt path %s indicating the driver 
has interrupted the job, aborting mapper.", interruptPath));
+        return;
+      }
+
       GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null;
       try {
         // De-serialize and collect the list of WorkUnits to run
@@ -701,7 +767,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
         gobblinMultiTaskAttempt =
             GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), 
context.getTaskAttemptID().toString(),
                 this.jobState, this.workUnits, this.taskStateTracker, 
this.taskExecutor, this.taskStateStore,
-                multiTaskAttemptCommitPolicy, jobBroker);
+                multiTaskAttemptCommitPolicy, jobBroker, (gmta) -> {
+                  try {
+                    return this.fs.exists(interruptPath);
+                  } catch (IOException ioe) {
+                    return false;
+                  }
+                });
 
         if (this.isSpeculativeEnabled) {
           LOG.info("will not commit in task attempt");
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
index f2cd04b..9a798c1 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
@@ -15,40 +15,6 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.gobblin.runtime.spec_catalog;
 
 import java.io.Closeable;
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
new file mode 100644
index 0000000..ead087d
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ProjectableFilterableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.AbstractTable;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+
+import lombok.Data;
+
+
+/**
+ * An predicate evaluator that uses an interface to define a table schema and 
can evaluate SQL statements on instances of
+ * that interface. See {@link ReflectivePredicateEvaluatorTest} for examples.
+ *
+ * Note all evaluated statements must return a single row with a single 
boolean column.
+ *
+ * Usage:
+ * ReflectivePredicateEvaluator<MyInterface> evaluator = new 
ReflectivePredicateEvaluator<>(MyInterface.class, "SELECT ... FROM 
myInterface");
+ * evaluator.evaluate(instance1, instance2, ...); // use the statement 
provided in constructor
+ * -- or --
+ * evaluator.evaluate("SELECT ... FROM myInterface", instance1, instance2, 
...);
+ */
+public class ReflectivePredicateEvaluator implements Closeable {
+       private static final String REFERENCE_INTERFACES = "refInterface";
+       private static final String OPERATOR_ID = "operatorId";
+       private static final Pattern FIELD_NAME_EXTRACTOR = 
Pattern.compile("(?:get([A-Z]))?(.+)");
+
+       private static final String MODEL_PATTERN = "{"
+                       + "version: 1, defaultSchema: 'MAIN',"
+                       + "schemas: ["
+                       + "{name: 'MAIN', type: 'custom', factory: '%s', 
operand: {%s: '%s', %s: '%d'}}"
+                       + "]}";
+       private static final String CONNECT_STRING_PATTERN = 
"jdbc:calcite:model=inline:%s";
+
+       private static final Cache<Integer, ReflectivePredicateEvaluator> 
REGISTRY = CacheBuilder.newBuilder().weakValues().build();
+       private static final AtomicInteger IDENTIFIER = new AtomicInteger();
+
+       private final List<Class<?>> referenceInterfaces;
+       private final int identifier;
+       private final Connection conn;
+       private final PreparedStatement stmnt;
+
+       private final String sql;
+
+       private volatile List<Object> objects;
+
+       /**
+        * @param sql The default SQL expression to run in this evaluator.
+        * @param referenceInterfaces The interface that will be used to 
generate the table schema.
+        * @throws SQLException
+        */
+       public ReflectivePredicateEvaluator(String sql, Class<?>... 
referenceInterfaces) throws SQLException  {
+               this.referenceInterfaces = 
Lists.newArrayList(referenceInterfaces);
+               this.sql = sql;
+
+               this.identifier = IDENTIFIER.getAndIncrement();
+               REGISTRY.put(this.identifier, this);
+
+               String model = computeModel();
+               String connectString = String.format(CONNECT_STRING_PATTERN, 
model);
+
+               this.conn =
+                               DriverManager.getConnection(connectString);
+               this.stmnt = prepareStatement(sql);
+       }
+
+       private PreparedStatement prepareStatement(String sql) throws 
SQLException {
+               PreparedStatement stmnt = null;
+               try {
+                       stmnt = this.conn.prepareStatement(sql);
+                       validateSql(stmnt, sql);
+                       return stmnt;
+               } catch (Throwable t) {
+                       if (stmnt != null) {
+                               stmnt.close();
+                       }
+                       throw t;
+               }
+       }
+
+       private String computeModel() {
+               return String.format(MODEL_PATTERN, 
PESchemaFactory.class.getName(), REFERENCE_INTERFACES,
+                               
Joiner.on(",").join(this.referenceInterfaces.stream().map(Class::getName).collect(Collectors.toList())),
+                               OPERATOR_ID, this.identifier);
+       }
+
+       private void validateSql(PreparedStatement stmnt, String sql) throws 
SQLException {
+               ResultSetMetaData metaData = stmnt.getMetaData();
+
+               if (metaData.getColumnCount() != 1 || metaData.getColumnType(1) 
!= Types.BOOLEAN) {
+                       throw new IllegalArgumentException("Statement is 
expected to return a single boolean column. Provided statement: " + sql);
+               }
+       }
+
+       /**
+        * Evaluate the default predicate on the list of provided objects.
+        * @throws SQLException
+        */
+       public boolean evaluate(Object... objects) throws SQLException{
+               return evaluate(Lists.newArrayList(objects), null);
+       }
+
+       /**
+        * Evaluate an ad-hoc predicate on the list of provided objects.
+        * Note {@link #evaluate(Object...)} is preferable as it only does 
validation of the expression once.
+        * @throws SQLException
+        */
+       public boolean evaluate(String sql, Object... objects) throws 
SQLException{
+               return evaluate(Lists.newArrayList(objects), sql);
+       }
+
+       /**
+        * Evaluate the default predicate on the list of provided objects.
+        * @throws SQLException
+        */
+       public boolean evaluate(List<Object> objects) throws SQLException {
+               return evaluate(objects, null);
+       }
+
+       /**
+        * Evaluate an ad-hoc predicate on the list of provided objects.
+        * Note {@link #evaluate(Object[])} is preferable as it only does 
validation of the expression once.
+        * @throws SQLException
+        */
+       public boolean evaluate(List<Object> objects, String sql) throws 
SQLException {
+               synchronized (this) {
+                       String actualSql = sql == null ? this.sql : sql;
+                       PreparedStatement actualStmnt = null;
+                       try {
+                               actualStmnt = sql == null ? this.stmnt : 
prepareStatement(sql);
+
+                               this.objects = objects;
+                               actualStmnt.execute();
+                               ResultSet rs = actualStmnt.getResultSet();
+                               if (!rs.next()) {
+                                       throw new 
IllegalArgumentException("Expected at least one returned row. SQL evaluated: " 
+ actualSql);
+                               }
+                               boolean result = true;
+                               do {
+                                       result &= rs.getBoolean(1);
+                               } while (rs.next());
+                               return result;
+                       } finally {
+                               if (sql != null && actualStmnt != null) {
+                                       actualStmnt.close();
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void close()
+                       throws IOException {
+               try {
+                       if (this.stmnt != null) {
+                               this.stmnt.close();
+                       }
+                       if (this.conn != null) {
+                               this.conn.close();
+                       }
+               } catch (SQLException exc) {
+                       throw new IOException("Failed to close " + 
ReflectivePredicateEvaluator.class.getSimpleName(), exc);
+               }
+       }
+
+       /**
+        * Calcite {@link SchemaFactory} used for the evaluator.
+        * This class is public because Calcite uses reflection to instantiate 
it, there is no reason to use it anywhere else
+        * in Gobblin.
+        */
+       public static class PESchemaFactory implements SchemaFactory {
+
+               @Override
+               public Schema create(SchemaPlus parentSchema, String name, 
Map<String, Object> operand) {
+                       try {
+                               List<Class<?>> referenceInterfaces = new 
ArrayList<>();
+                               for (String iface : 
Splitter.on(",").splitToList(operand.get(REFERENCE_INTERFACES).toString())) {
+                                       
referenceInterfaces.add(Class.forName(iface));
+                               }
+                               int operatorIdentifier = 
Integer.parseInt(operand.get(OPERATOR_ID).toString());
+
+                               return new AbstractSchema() {
+                                       @Override
+                                       protected Map<String, Table> 
getTableMap() {
+                                               HashMap<String, Table> map = 
new HashMap<>();
+                                               for (Class<?> iface : 
referenceInterfaces) {
+                                                       
map.put(iface.getSimpleName().toUpperCase(),
+                                                                       new 
PETable(iface, operatorIdentifier));
+                                               }
+                                               return map;
+                                       }
+                               };
+                       } catch (ReflectiveOperationException roe) {
+                               throw new RuntimeException(roe);
+                       }
+               }
+       }
+
+       @Data
+       private static class PETable extends AbstractTable implements 
ProjectableFilterableTable {
+               private final Class<?> referenceInterface;
+               private final int operatorIdentifier;
+               private volatile boolean initialized = false;
+
+               private RelDataType rowType;
+               private List<Function<Object, Object>> methodsForFields = new 
ArrayList<>();
+
+               @Override
+               public Enumerable<Object[]> scan(DataContext root, 
List<RexNode> filters, int[] projects) {
+                       List<Object> list = 
REGISTRY.getIfPresent(this.operatorIdentifier).objects;
+
+                       final int[] actualProjects = resolveProjects(projects);
+
+                       Enumerator<Object[]> enumerator =  
Linq4j.enumerator(list.stream()
+                                       .filter(o -> 
referenceInterface.isAssignableFrom(o.getClass()))
+                                       .map(
+                                       m -> {
+                                               Object[] res = new 
Object[actualProjects.length];
+                                               for (int i = 0; i < 
actualProjects.length; i++) {
+                                                       res[i] = 
methodsForFields.get(actualProjects[i]).apply(m);
+                                               }
+                                               return res;
+                                       }
+                       ).collect(Collectors.toList()));
+
+                       return new AbstractEnumerable<Object[]>() {
+                               @Override
+                               public Enumerator<Object[]> enumerator() {
+                                       return enumerator;
+                               }
+                       };
+               }
+
+               private int[] resolveProjects(int[] projects) {
+                       if (projects == null) {
+                               projects = new int[methodsForFields.size()];
+                               for (int i = 0; i < projects.length; i++) {
+                                       projects[i] = i;
+                               }
+                       }
+                       return projects;
+               }
+
+               @Override
+               public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+                       initialize((JavaTypeFactory) typeFactory);
+                       return this.rowType;
+               }
+
+               private synchronized void initialize(JavaTypeFactory 
typeFactory) {
+                       if (this.initialized) {
+                               return;
+                       }
+
+                       this.methodsForFields = new ArrayList<>();
+                       List<RelDataTypeField> fields = new ArrayList<>();
+
+                       for (Method method : 
this.referenceInterface.getMethods()) {
+                               if (method.getParameterCount() == 0) {
+                                       String fieldName = 
computeFieldName(method.getName());
+                                       if (fieldName != null) {
+                                               
this.methodsForFields.add(extractorForMethod(method));
+                                               Class<?> retType = 
method.getReturnType();
+                                               if (retType.isEnum()) {
+                                                       retType = String.class;
+                                               }
+                                               fields.add(new 
RelDataTypeFieldImpl(fieldName.toUpperCase(), fields.size(), 
typeFactory.createType(retType)));
+                                       }
+                               }
+                       }
+
+                       this.rowType = new MyDataType(fields, 
referenceInterface);
+                       this.initialized = true;
+               }
+
+               private Function<Object, Object> extractorForMethod(Method 
method) {
+                       return o -> {
+                               try {
+                                       Object ret = method.invoke(o);
+                                       return method.getReturnType().isEnum() 
? ret.toString() : ret;
+                               } catch (ReflectiveOperationException roe) {
+                                       throw new RuntimeException(roe);
+                               }
+                       };
+               }
+
+       }
+
+       private static class MyDataType extends RelDataTypeImpl {
+               private final String typeString;
+
+               public MyDataType(List<? extends RelDataTypeField> fieldList, 
Class<?> refInterface) {
+                       super(fieldList);
+                       this.typeString = refInterface.getName();
+                       computeDigest();
+               }
+
+               @Override
+               protected void generateTypeString(StringBuilder sb, boolean 
withDetail) {
+                       sb.append(typeString);
+               }
+       }
+
+       private static String computeFieldName(String methodName) {
+               Matcher matcher = FIELD_NAME_EXTRACTOR.matcher(methodName);
+               if (matcher.matches()) {
+                       return matcher.group(1) + matcher.group(2);
+               }
+               return null;
+       }
+
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
new file mode 100644
index 0000000..21625a8
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.JobState;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+public class JobInterruptionPredicateTest {
+
+       @Test
+       public void testJobPredicate() {
+               SettableJobProgress jobProgress = new 
SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING, new 
ArrayList<>());
+               AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+               JobInterruptionPredicate predicate =
+                               new JobInterruptionPredicate(jobProgress, 
"SELECT completedTasks > 5 FROM jobProgress", () -> atomicBoolean.set(true), 
false);
+
+               predicate.runOneIteration();
+               Assert.assertFalse(atomicBoolean.get());
+
+               jobProgress.completedTasks = 6;
+
+               predicate.runOneIteration();
+               Assert.assertTrue(atomicBoolean.get());
+       }
+
+       @Test
+       public void testTaskPredicate() {
+               SettableTaskProgress t1 = new SettableTaskProgress("j1", "t1", 
WorkUnitState.WorkingState.RUNNING, false);
+               SettableTaskProgress t2 = new SettableTaskProgress("j1", "t1", 
WorkUnitState.WorkingState.RUNNING, false);
+
+               SettableJobProgress jobProgress = new 
SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING,
+                               Lists.newArrayList(t1, t2));
+               AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+               JobInterruptionPredicate predicate =
+                               new JobInterruptionPredicate(jobProgress, 
"SELECT count(*) > 0 FROM taskProgress WHERE workingState = 'FAILED'", () -> 
atomicBoolean.set(true), false);
+
+               predicate.runOneIteration();
+               Assert.assertFalse(atomicBoolean.get());
+
+               t2.workingState = WorkUnitState.WorkingState.FAILED;
+
+               predicate.runOneIteration();
+               Assert.assertTrue(atomicBoolean.get());
+       }
+
+       @Test
+       public void testTaskAndJobPredicate() {
+               SettableTaskProgress t1 = new SettableTaskProgress("j1", "t1", 
WorkUnitState.WorkingState.RUNNING, false);
+               SettableTaskProgress t2 = new SettableTaskProgress("j1", "t1", 
WorkUnitState.WorkingState.RUNNING, false);
+
+               SettableJobProgress jobProgress = new 
SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING,
+                               Lists.newArrayList(t1, t2));
+               AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+               JobInterruptionPredicate predicate =
+                               new JobInterruptionPredicate(jobProgress,
+                                               "SELECT EXISTS(SELECT * FROM 
(SELECT completedTasks > 5 AS pred FROM jobProgress UNION SELECT count(*) > 0 
AS pred FROM taskProgress WHERE workingState = 'FAILED') WHERE pred)",
+                                               () -> atomicBoolean.set(true), 
false);
+
+               predicate.runOneIteration();
+               Assert.assertFalse(atomicBoolean.get());
+
+               t2.workingState = WorkUnitState.WorkingState.FAILED;
+
+               predicate.runOneIteration();
+               Assert.assertTrue(atomicBoolean.get());
+               atomicBoolean.set(false);
+
+               t2.workingState = WorkUnitState.WorkingState.RUNNING;
+
+               predicate.runOneIteration();
+               Assert.assertFalse(atomicBoolean.get());
+
+               jobProgress.completedTasks = 6;
+
+               predicate.runOneIteration();
+               Assert.assertTrue(atomicBoolean.get());
+       }
+
+       @Getter
+       @AllArgsConstructor
+       public static class SettableJobProgress implements JobProgress {
+               private final String jobId;
+               private int taskCount;
+               private int completedTasks;
+               private long elapsedTime;
+               private JobState.RunningState runningState;
+               private List<TaskProgress> taskProgress;
+
+               @Override
+               public JobState.RunningState getState() {
+                       return this.runningState;
+               }
+       }
+
+       @Getter
+       @AllArgsConstructor
+       public static class SettableTaskProgress implements TaskProgress {
+               private final String jobId;
+               private final String taskId;
+               private WorkUnitState.WorkingState workingState;
+               private boolean isCompleted;
+       }
+
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
new file mode 100644
index 0000000..ba7dc4b
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import lombok.Data;
+
+public class ReflectivePredicateEvaluatorTest {
+
+       @Test
+       public void simpleTest() throws Exception {
+               ReflectivePredicateEvaluator evaluator = new 
ReflectivePredicateEvaluator(
+                               "SELECT anInt = 1 FROM myInterface", 
MyInterface.class);
+
+               Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"foo")));
+               Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, 
"foo")));
+
+               Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR 
aString = 'foo' FROM myInterface",
+                               new MyImplementation(1, "bar")));
+               Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR 
aString = 'foo' FROM myInterface",
+                               new MyImplementation(2, "foo")));
+               Assert.assertFalse(evaluator.evaluate("SELECT anInt = 1 OR 
aString = 'foo' FROM myInterface",
+                               new MyImplementation(2, "bar")));
+       }
+
+       @Test
+       public void testWithAggregations() throws Exception {
+               ReflectivePredicateEvaluator evaluator = new 
ReflectivePredicateEvaluator(
+                               "SELECT sum(anInt) = 5 FROM myInterface", 
MyInterface.class);
+
+               Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, 
"foo")));
+               Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"foo"), new MyImplementation(4, "foo")));
+               Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, 
"foo"), new MyImplementation(4, "foo")));
+       }
+
+       @Test
+       public void testWithAggregationsAndFilter() throws Exception {
+               ReflectivePredicateEvaluator evaluator = new 
ReflectivePredicateEvaluator(
+                               "SELECT sum(anInt) = 5 FROM myInterface WHERE 
aString = 'foo'", MyInterface.class);
+
+               Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, 
"foo")));
+               Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "bar")));
+               Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, 
"foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "foo")));
+       }
+
+       @Test
+       public void testMultipleInterfaces() throws Exception {
+               ReflectivePredicateEvaluator evaluator = new 
ReflectivePredicateEvaluator(
+                               "SELECT true = ALL (SELECT sum(anInt) = 2 AS 
satisfied FROM myInterface UNION SELECT sum(anInt) = 3 AS satisfied FROM 
myInterface2)",
+                               MyInterface.class, MyInterface2.class);
+               Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, 
"foo")));
+               Assert.assertTrue(evaluator.evaluate(new MyImplementation(2, 
"foo"), new MyImplementation2(3)));
+               Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"foo"), new MyImplementation2(3), new MyImplementation(1, "foo")));
+       }
+
+       @Test
+       public void testMultipleOutputs() throws Exception {
+               ReflectivePredicateEvaluator evaluator =
+                               new ReflectivePredicateEvaluator("SELECT anInt 
= 1 FROM myInterface", MyInterface.class);
+               Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"bar"), new MyImplementation(1, "foo")));
+               Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, 
"bar"), new MyImplementation(2, "foo")));
+       }
+
+       @Test
+       public void testInvalidSQL() throws Exception {
+               try {
+                       ReflectivePredicateEvaluator evaluator =
+                                       new 
ReflectivePredicateEvaluator("SELECT anInt FROM myInterface", 
MyInterface.class);
+                       Assert.fail();
+               } catch (IllegalArgumentException exc) {
+                       // Expected
+               }
+       }
+
+       @Test
+       public void testNoOutputs() throws Exception {
+               try {
+                       ReflectivePredicateEvaluator evaluator =
+                                       new 
ReflectivePredicateEvaluator("SELECT anInt = 1 FROM myInterface WHERE aString = 
'foo'",
+                                                       MyInterface.class);
+                       evaluator.evaluate(new MyImplementation(1, "bar"));
+                       Assert.fail();
+               } catch (IllegalArgumentException exc) {
+                       // Expected
+               }
+       }
+
+       private interface MyInterface {
+               int getAnInt();
+               String getAString();
+       }
+
+       @Data
+       private static class MyImplementation implements MyInterface {
+               private final int anInt;
+               private final String aString;
+       }
+
+       private interface MyInterface2 {
+               int getAnInt();
+       }
+
+       @Data
+       private static class MyImplementation2 implements MyInterface2 {
+               private final int anInt;
+       }
+
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
new file mode 100644
index 0000000..20a2bf8
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of a basic FiniteStateMachine that allows keeping track 
of the state its state and gating certain
+ * logic on whether a transition is valid or not.
+ *
+ * This class is useful in situations where logic is complex, possibly 
multi-threaded, and can take multiple paths. Certain
+ * pieces of logic (for example running a job, publishing a dataset, etc) can 
only happen if other actions ended correctly,
+ * and the FSM is a way of simplifying the encoding and verification of those 
conditions. It is understood that state
+ * transitions may not be instantaneous, and that other state transitions 
should not start until the current one has
+ * been resolved.
+ *
+ * All public methods of this class will wait until the FSM is in a 
non-transitioning state. If multiple transitions are
+ * queued at the same time, the order in which they are executed is 
essentially random.
+ *
+ * The states supported by FSM can be enums or instances of any base type. The 
legality of a transition is determined
+ * by equality, i.e. if a transition A -> B is legal, the current state is A' 
and the desired end state is B', the transition
+ * will be legal if A.equals(A') && B.equals(B'). This allows for storing 
additional information into the current state
+ * as long as it does not affect the equality check (i.e. fields that are not 
compared in the equals check can store
+ * state metadata, etc.).
+ *
+ * Suggested Usage:
+ * FiniteStateMachine<MySymbols> fsm = new 
FiniteStateMachine.Builder().addTransition(START_SYMBOL, 
END_SYMBOL).build(initialSymbol);
+ *
+ * try (Transition transition = fsm.startTransition(MY_END_STATE)) {
+ *   try {
+ *     // my logic
+ *   } catch (MyException exc) {
+ *     transition.changeEndState(MY_ERROR);
+ *   }
+ * } catch (UnallowedTransitionException exc) {
+ *   // Cannot execute logic because it's an illegal transition!
+ * } catch (ReentrantStableStateWait exc) {
+ *      // Somewhere in the logic an instruction tried to do an operation with 
the fsm that would likely cause a deadlock
+ * } catch (AbandonedTransitionException exc) {
+ *   // Another thread initiated a transition and became inactive ending the 
transition
+ * } catch (InterruptedException exc) {
+ *   // Could not start transition because thread got interrupted while 
waiting for a non-transitioning state
+ * } catch (FailedTransitionCallbackException exc) {
+ *   // A callback in the transition start or end states has failed.
+ *   exc.getTransition().changeEndState(MY_ERROR).closeWithoutCallbacks(); // 
example handling
+ * }
+ *
+ * @param <T>
+ */
+@Slf4j
+public class FiniteStateMachine<T> {
+
+       /**
+        * Used to build a {@link FiniteStateMachine} instance.
+        */
+       public static class Builder<T> {
+               private final SetMultimap<T, T> allowedTransitions;
+               private final Set<T> universalEnds;
+    private T errorState;
+
+    public Builder() {
+                       this.allowedTransitions = HashMultimap.create();
+                       this.universalEnds = new HashSet<>();
+               }
+
+               /**
+                * Add a legal transition to the {@link FiniteStateMachine}.
+                */
+               public Builder<T> addTransition(T startState, T endState) {
+                       this.allowedTransitions.put(startState, endState);
+                       return this;
+               }
+
+               /**
+                * Specify that a state is a valid end state for a transition 
starting from any state. Useful for example for
+                * error states.
+                */
+               public Builder<T> addUniversalEnd(T state) {
+                       this.universalEnds.add(state);
+                       return this;
+               }
+
+    /**
+     * Specify the error state to which this machine can transition if nothing 
else is possible. Note the error state
+     * is always an allowed end state.
+     */
+               public Builder<T> errorState(T state) {
+                 this.errorState = state;
+                 return this;
+    }
+
+               /**
+                * Build a {@link FiniteStateMachine} starting at the given 
initial state.
+                */
+               public FiniteStateMachine<T> build(T initialState) {
+                       return new 
FiniteStateMachine<>(this.allowedTransitions, this.universalEnds, 
this.errorState, initialState);
+               }
+       }
+
+       private final SetMultimap<T, T> allowedTransitions;
+       private final Set<T> universalEnds;
+       private final T errorState;
+
+       private final ReentrantReadWriteLock lock;
+       private final Condition condition;
+       private final T initialState;
+
+       private volatile T currentState;
+       private volatile Transition currentTransition;
+
+       protected FiniteStateMachine(SetMultimap<T, T> allowedTransitions, 
Set<T> universalEnds, T errorState, T initialState) {
+               this.allowedTransitions = allowedTransitions;
+               this.universalEnds = universalEnds;
+               this.errorState = errorState;
+
+               this.lock = new ReentrantReadWriteLock();
+               this.condition = this.lock.writeLock().newCondition();
+               this.initialState = initialState;
+               this.currentState = initialState;
+
+               if (this.currentState instanceof StateWithCallbacks) {
+                 ((StateWithCallbacks) this.currentState).onEnterState(null);
+    }
+       }
+
+       /**
+        * Start a transition to the end state specified. The returned {@link 
Transition} object is a closeable that will finalize
+        * the transition when it is closed. While the transition is open, no 
other transition can start.
+        *
+        * It is recommended to call this method only within a 
try-with-resource block to ensure the transition is closed.
+        *
+        * @throws UnallowedTransitionException If the transition is not 
allowed.
+        * @throws InterruptedException if the thread got interrupted while 
waiting for a non-transitioning state.
+        */
+       public Transition startTransition(T endState) throws 
UnallowedTransitionException, InterruptedException {
+               try {
+                       this.lock.writeLock().lock();
+                       while (isTransitioning()) {
+                               this.condition.await();
+                       }
+                       if (!isAllowedTransition(this.currentState, endState)) {
+                               throw new 
UnallowedTransitionException(this.currentState, endState);
+                       }
+                       Transition transition = new Transition(endState);
+                       this.currentTransition = transition;
+                       return transition;
+               } finally {
+                       this.lock.writeLock().unlock();
+               }
+       }
+
+       /**
+        * Transition immediately to the given end state. This is essentially 
{@link #startTransition(Object)} immediately
+        * followed by {@link Transition#close()}.
+        *
+        * @throws UnallowedTransitionException if the transition is not 
allowed.
+        * @throws InterruptedException if the thread got interrupted while 
waiting for a non-transitioning state.
+        */
+       public void transitionImmediately(T endState) throws 
UnallowedTransitionException, InterruptedException, 
FailedTransitionCallbackException {
+               Transition transition = startTransition(endState);
+               transition.close();
+       }
+
+       /**
+        * Transition immediately to the given end state if the transition is 
allowed.
+        *
+        * @return true if the transition happened.
+        * @throws InterruptedException if the thread got interrupted while 
waiting for a non-transitioning state.
+        */
+       public boolean transitionIfAllowed(T endState) throws 
InterruptedException, FailedTransitionCallbackException {
+               try {
+                       transitionImmediately(endState);
+               } catch (UnallowedTransitionException exc) {
+                       return false;
+               }
+               return true;
+       }
+
+       /**
+        * Get the current state. This method will wait until the FSM is in a 
non-transitioning state (although a transition
+        * may start immediately after).
+        * @throws InterruptedException if the thread got interrupted while 
waiting for a non-transitioning state.
+        */
+       public T getCurrentState() throws InterruptedException {
+               try {
+                 // Need to get lock to make sure we're not in transitioning 
state.
+                       this.lock.readLock().lock();
+
+                       waitForNonTransitioningReadLock();
+
+                       return this.currentState;
+               } finally {
+                       this.lock.readLock().unlock();
+               }
+       }
+
+       @VisibleForTesting
+       T getCurrentStateEvenIfTransitioning() {
+               return this.currentState;
+       }
+
+       /**
+        * @return A clone of this FSM starting at the initial state of the FSM.
+        */
+       public FiniteStateMachine<T> cloneAtInitialState() {
+               return new FiniteStateMachine<>(this.allowedTransitions, 
this.universalEnds, this.errorState, this.initialState);
+       }
+
+       /**
+        * @return A clone of this FSM starting at the current state of the FSM.
+        */
+       public FiniteStateMachine<T> cloneAtCurrentState() throws 
InterruptedException {
+               try {
+                       this.lock.readLock().lock();
+
+                       waitForNonTransitioningReadLock();
+
+                       return new 
FiniteStateMachine<>(this.allowedTransitions, this.universalEnds, 
this.errorState, this.currentState);
+               } finally {
+                       this.lock.readLock().unlock();
+               }
+       }
+
+       /**
+        * Waits for a read lock in a non-transitioning state. The caller MUST 
hold the read lock before calling this method.
+        * @throws InterruptedException
+        */
+       private void waitForNonTransitioningReadLock() throws 
InterruptedException {
+               if (isTransitioning()) {
+                       this.lock.readLock().unlock();
+                       // To use the condition, need to upgrade to a write lock
+                       this.lock.writeLock().lock();
+                       try {
+                               while (isTransitioning()) {
+                                       this.condition.await();
+                               }
+                               // After non-transitioning state, downgrade 
again to read-lock
+                               this.lock.readLock().lock();
+                       } finally {
+                               this.lock.writeLock().unlock();
+                       }
+               }
+       }
+
+       private boolean isTransitioning() {
+               if (this.currentTransition != null && 
Thread.currentThread().equals(this.currentTransition.ownerThread)) {
+                       throw new ReentrantStableStateWait(
+                                       "Tried to check for non-transitioning 
state from a thread that had already initiated a transition, "
+                                                       + "this may indicate a 
deadlock. To change end state use Transition.changeEndState() instead.");
+               }
+               if (this.currentTransition != null && 
!this.currentTransition.ownerThread.isAlive()) {
+                       throw new 
AbandonedTransitionException(this.currentTransition.ownerThread);
+               }
+               return this.currentTransition != null;
+       }
+
+       protected boolean isAllowedTransition(T startState, T endState) {
+         if (endState.equals(this.errorState)) {
+           return true;
+    }
+               if (this.universalEnds.contains(endState)) {
+                       return true;
+               }
+               Set<T> endStates = this.allowedTransitions.get(startState);
+               return endStates != null && endStates.contains(endState);
+       }
+
+       /**
+        * A handle used for controlling the transition of the {@link 
FiniteStateMachine}. Note if this handle is lost the
+        * {@link FiniteStateMachine} will likely go into an invalid state.
+        */
+       public class Transition implements Closeable {
+               private final Thread ownerThread;
+               private volatile T endState;
+               private volatile boolean closed;
+
+               private Transition(T endState) {
+                       this.ownerThread = Thread.currentThread();
+                       this.endState = endState;
+                       this.closed = false;
+               }
+
+               /**
+                * Get the state at the beginning of this transition.
+                */
+               public T getStartState() {
+                       if (this.closed) {
+                               throw new IllegalStateException("Transition 
already closed.");
+                       }
+                       return FiniteStateMachine.this.currentState;
+               }
+
+               /**
+                * Change the end state of the transition. The new end state 
must be a legal transition for the state when the
+                * {@link Transition} was created.
+                *
+                * @throws UnallowedTransitionException if the new end state is 
not an allowed transition.
+                */
+               public synchronized void changeEndState(T endState) throws 
UnallowedTransitionException {
+                       if (this.closed) {
+                               throw new IllegalStateException("Transition 
already closed.");
+                       }
+                       if 
(!isAllowedTransition(FiniteStateMachine.this.currentState, endState)) {
+                               throw new 
UnallowedTransitionException(FiniteStateMachine.this.currentState, endState);
+                       }
+                       this.endState = endState;
+               }
+
+    /**
+     * Change the end state of the transition to the FSM error state.
+     */
+               public synchronized void switchEndStateToErrorState() {
+                 this.endState = FiniteStateMachine.this.errorState;
+    }
+
+               /**
+                * Close the current transition moving the {@link 
FiniteStateMachine} to the end state and releasing all locks.
+     *
+     * @throws FailedTransitionCallbackException when start or end state 
callbacks fail. Note if this exception is thrown
+     * the transition is not complete and the error must be handled to 
complete it.
+                */
+               @Override
+               public void close() throws FailedTransitionCallbackException {
+                       doClose(true);
+               }
+
+    /**
+     * Close the current transition moving the {@link FiniteStateMachine} to 
the end state and releasing all locks without
+     * calling any callbacks. This method should only be called after a {@link 
#close()} has failed and the failure
+     * cannot be handled.
+     */
+               public void closeWithoutCallbacks() {
+                 try {
+        doClose(false);
+      } catch (FailedTransitionCallbackException exc) {
+                   throw new IllegalStateException(String.format("Close 
without callbacks threw a %s. This is an error in code.",
+            FailedTransitionCallbackException.class), exc);
+      }
+    }
+
+               private synchronized void doClose(boolean withCallbacks) throws 
FailedTransitionCallbackException {
+      if (this.closed) {
+        return;
+      }
+
+      try {
+        FiniteStateMachine.this.lock.writeLock().lock();
+
+        try {
+          if (withCallbacks && getStartState() instanceof StateWithCallbacks) {
+            ((StateWithCallbacks<T>) 
getStartState()).onLeaveState(this.endState);
+          }
+        } catch (Throwable t) {
+          throw new FailedTransitionCallbackException(this, 
FailedCallback.START_STATE, t);
+        }
+
+        try {
+          if (withCallbacks && this.endState instanceof StateWithCallbacks) {
+            ((StateWithCallbacks) this.endState).onEnterState(getStartState());
+          }
+        } catch (Throwable t) {
+          throw new FailedTransitionCallbackException(this, 
FailedCallback.END_STATE, t);
+        }
+
+        this.closed = true;
+
+        FiniteStateMachine.this.currentState = this.endState;
+        FiniteStateMachine.this.currentTransition = null;
+        FiniteStateMachine.this.condition.signalAll();
+      } finally {
+        FiniteStateMachine.this.lock.writeLock().unlock();
+      }
+    }
+       }
+
+       /**
+        * If a transition is not allowed to happen.
+        */
+       @Getter
+       public static class UnallowedTransitionException extends Exception {
+               private final Object startState;
+               private final Object endState;
+
+               public UnallowedTransitionException(Object startState, Object 
endState) {
+                       super(String.format("Unallowed transition: %s -> %s", 
startState, endState));
+                       this.startState = startState;
+                       this.endState = endState;
+               }
+       }
+
+       /**
+        * Thrown when a thread that has started a transition is waiting for a 
non-transitioning state, which is a deadlock situation.
+        */
+       public static class ReentrantStableStateWait extends RuntimeException {
+               public ReentrantStableStateWait(String message) {
+                       super(message);
+               }
+       }
+
+       /**
+        * Thrown when a transition was initiated by a thread that no longer 
exists, likely implying that the transition can
+        * never be closed.
+        */
+       public static class AbandonedTransitionException extends 
RuntimeException {
+               private final Thread startingThread;
+
+               public AbandonedTransitionException(Thread startingThread) {
+                       super(String.format("Thread %s initiated a transition 
but became inactive before closing it.", startingThread));
+                       this.startingThread = startingThread;
+               }
+       }
+
+       public enum FailedCallback {
+         START_STATE, END_STATE
+  }
+
+  /**
+   * Thrown when the callbacks when closing a transition fail.
+   */
+  @Getter
+       public static class FailedTransitionCallbackException extends 
IOException {
+         private final FiniteStateMachine.Transition transition;
+         private final FailedCallback failedCallback;
+         private final Throwable originalException;
+
+    public FailedTransitionCallbackException(FiniteStateMachine<?>.Transition 
transition, FailedCallback failedCallback,
+        Throwable originalException) {
+      super("Failed callbacks when ending transition.", originalException);
+      this.transition = transition;
+      this.failedCallback = failedCallback;
+      this.originalException = originalException;
+    }
+  }
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
new file mode 100644
index 0000000..9a77a7d
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * A state for a {@link FiniteStateMachine} which supports callbacks when 
entering and leaving the state.
+ * @param <T> supertype of states in the FSM.
+ */
+public interface StateWithCallbacks<T> {
+
+       /**
+        * Called when an FSM reaches this state.
+        * @param previousState the previous state of the machine.
+        */
+       default void onEnterState(@Nullable T previousState) {
+               // do nothing
+       }
+
+       /**
+        * Called when an FSM leaves this state.
+        * @param nextState the next state of the machine.
+        */
+       default void onLeaveState(T nextState) {
+               // do nothing
+       }
+
+}
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
new file mode 100644
index 0000000..ec50d02
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class FiniteStateMachineTest {
+
+       public enum MyStates {
+               PENDING, RUNNING, SUCCESS, ERROR
+       }
+
+       private final FiniteStateMachine<MyStates> refFsm = new 
FiniteStateMachine.Builder<MyStates>()
+                       .addTransition(MyStates.PENDING, MyStates.RUNNING)
+                       .addTransition(MyStates.RUNNING, MyStates.SUCCESS)
+                       .addTransition(MyStates.PENDING, MyStates.ERROR)
+                       .addTransition(MyStates.RUNNING, 
MyStates.ERROR).build(MyStates.PENDING);
+
+       @Test
+       public void singleThreadImmediateTransitionsTest() throws Exception {
+               FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+               fsm.transitionImmediately(MyStates.RUNNING);
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+               fsm.transitionImmediately(MyStates.SUCCESS);
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.SUCCESS);
+
+               fsm = fsm.cloneAtInitialState();
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+               fsm.transitionImmediately(MyStates.ERROR);
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+
+               fsm = fsm.cloneAtCurrentState();
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+       }
+
+       @Test
+       public void illegalTransitionsTest() throws Exception {
+               FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+               try {
+                       fsm.transitionImmediately(MyStates.PENDING);
+                       Assert.fail();
+               } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+                       // expected
+               }
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+               try {
+                       fsm.transitionImmediately(MyStates.SUCCESS);
+                       Assert.fail();
+               } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+                       // expected
+               }
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+               fsm.transitionImmediately(MyStates.RUNNING);
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+       }
+
+       @Test
+       public void slowTransitionsTest() throws Exception {
+               FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+               try (FiniteStateMachine.Transition transition = 
fsm.startTransition(MyStates.RUNNING)) {
+                       try {
+                               fsm.getCurrentState();
+                               Assert.fail();
+                       } catch (FiniteStateMachine.ReentrantStableStateWait 
exc) {
+                               // Expected because the same thread that is 
transitioning tries to read the current state
+                       }
+                       try {
+                               fsm.transitionImmediately(MyStates.RUNNING);
+                               Assert.fail();
+                       } catch (FiniteStateMachine.ReentrantStableStateWait 
exc) {
+                               // Expected because the same thread that is 
transitioning tries to start another transition
+                       }
+               }
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+
+               try (FiniteStateMachine<MyStates>.Transition transition = 
fsm.startTransition(MyStates.SUCCESS)) {
+                       transition.changeEndState(MyStates.ERROR);
+               }
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+       }
+
+       @Test
+       public void callbackTest() throws Exception {
+         NamedStateWithCallback stateA = new NamedStateWithCallback("a");
+    NamedStateWithCallback stateB = new NamedStateWithCallback("b");
+    NamedStateWithCallback stateC = new NamedStateWithCallback("c", null, s -> 
{
+      throw new RuntimeException("leave");
+    });
+    NamedStateWithCallback stateD = new NamedStateWithCallback("d");
+
+    FiniteStateMachine<NamedStateWithCallback> fsm = new 
FiniteStateMachine.Builder<NamedStateWithCallback>()
+        .addTransition(new NamedStateWithCallback("a"), new 
NamedStateWithCallback("b"))
+        .addTransition(new NamedStateWithCallback("b"), new 
NamedStateWithCallback("c"))
+        .addTransition(new NamedStateWithCallback("c"), new 
NamedStateWithCallback("d"))
+        .addUniversalEnd(new NamedStateWithCallback("ERROR"))
+        .build(stateA);
+
+    fsm.transitionImmediately(stateB);
+
+    Assert.assertEquals(fsm.getCurrentState(), stateB);
+    Assert.assertEquals(stateA.lastTransition, "leave:a->b");
+    stateA.lastTransition = "";
+    Assert.assertEquals(stateB.lastTransition, "enter:a->b");
+    stateB.lastTransition = "";
+
+    try {
+      // State that will error on enter
+      fsm.transitionImmediately(new NamedStateWithCallback("c", s -> {
+        throw new RuntimeException("enter");
+      }, s -> {
+        throw new RuntimeException("leave");
+      }));
+      Assert.fail("Expected excpetion");
+    } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+      Assert.assertEquals(exc.getFailedCallback(), 
FiniteStateMachine.FailedCallback.END_STATE);
+      Assert.assertEquals(exc.getOriginalException().getMessage(), "enter");
+      // switch state to one that will only error on leave
+      exc.getTransition().changeEndState(stateC);
+      exc.getTransition().close();
+    }
+
+    Assert.assertEquals(fsm.getCurrentState(), stateC);
+    Assert.assertEquals(stateB.lastTransition, "leave:b->c");
+    stateB.lastTransition = "";
+    Assert.assertEquals(stateC.lastTransition, "enter:b->c");
+    stateC.lastTransition = "";
+
+    try {
+      fsm.transitionImmediately(stateD);
+      Assert.fail("Expected exception");
+    } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+      Assert.assertEquals(exc.getFailedCallback(), 
FiniteStateMachine.FailedCallback.START_STATE);
+      Assert.assertEquals(exc.getOriginalException().getMessage(), "leave");
+      // switch state to one that will only error on leave
+      exc.getTransition().changeEndState(new NamedStateWithCallback("ERROR"));
+      exc.getTransition().closeWithoutCallbacks();
+    }
+
+    Assert.assertEquals(fsm.getCurrentState(), new 
NamedStateWithCallback("ERROR"));
+    Assert.assertEquals(stateD.lastTransition, "");
+  }
+
+       @Test(timeOut = 5000)
+       public void multiThreadTest() throws Exception {
+               FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+               Transitioner<MyStates> t1 = new Transitioner<>(fsm, 
MyStates.RUNNING);
+               Transitioner<MyStates> t2 = new Transitioner<>(fsm, 
MyStates.ERROR);
+
+               Thread t1Thread = new Thread(null, t1, "t1");
+               t1Thread.start();
+               t1.awaitState(Sets.newHashSet(TransitionState.TRANSITIONING));
+               Assert.assertEquals(t1.transitionResult, 
TransitionState.TRANSITIONING);
+               Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), 
MyStates.PENDING);
+
+               Thread t2Thread = new Thread(null, t2, "t2");
+               t2Thread.start();
+               Assert.assertEquals(t1.transitionResult, 
TransitionState.TRANSITIONING);
+               Assert.assertEquals(t2.transitionResult, 
TransitionState.STARTING);
+               Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), 
MyStates.PENDING);
+
+               t1Thread.interrupt();
+               t1.awaitState(Sets.newHashSet(TransitionState.COMPLETED));
+               t2.awaitState(Sets.newHashSet(TransitionState.TRANSITIONING));
+               Assert.assertEquals(t1.transitionResult, 
TransitionState.COMPLETED);
+               Assert.assertEquals(t2.transitionResult, 
TransitionState.TRANSITIONING);
+               Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), 
MyStates.RUNNING);
+
+               t2Thread.interrupt();
+               t2.awaitState(Sets.newHashSet(TransitionState.COMPLETED));
+               Assert.assertEquals(t1.transitionResult, 
TransitionState.COMPLETED);
+               Assert.assertEquals(t2.transitionResult, 
TransitionState.COMPLETED);
+               Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(), 
MyStates.ERROR);
+       }
+
+       @Test(timeOut = 5000)
+       public void deadThreadTest() throws Exception {
+               FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+               Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+               Thread t = new Thread(() -> {
+                       try {
+                               FiniteStateMachine.Transition transition = 
fsm.startTransition(MyStates.RUNNING);
+                       } catch 
(FiniteStateMachine.UnallowedTransitionException | InterruptedException exc) {
+                               // do nothing
+                       }
+                       // since we don't close the transition, it should 
become orphaned
+               });
+               t.start();
+
+               while (t.isAlive()) {
+                       Thread.sleep(50);
+               }
+
+               try {
+                       fsm.transitionImmediately(MyStates.RUNNING);
+                       Assert.fail();
+               } catch (FiniteStateMachine.AbandonedTransitionException exc) {
+                       // Expected
+               }
+       }
+
+       @Data
+       private class Transitioner<T> implements Runnable {
+               private final FiniteStateMachine<T> fsm;
+               private final T endState;
+
+               private final Lock lock = new ReentrantLock();
+               private final Condition condition = lock.newCondition();
+
+               private volatile boolean running = false;
+               private volatile TransitionState transitionResult = 
TransitionState.STARTING;
+
+               @Override
+               public void run() {
+                       try(FiniteStateMachine.Transition transition = 
this.fsm.startTransition(this.endState)) {
+                               goToState(TransitionState.TRANSITIONING);
+                               try {
+                                       Thread.sleep(2000);
+                                       this.transitionResult = 
TransitionState.TIMEOUT;
+                                       return;
+                               } catch (InterruptedException ie) {
+                                       // This is the signal to end the state 
transition, so do nothing
+                               }
+                       } catch (InterruptedException exc) {
+                               goToState(TransitionState.INTERRUPTED);
+                               return;
+                       } catch 
(FiniteStateMachine.UnallowedTransitionException exc) {
+                               goToState(TransitionState.UNALLOWED);
+                               return;
+                       } catch 
(FiniteStateMachine.FailedTransitionCallbackException exc) {
+                               goToState(TransitionState.CALLBACK_ERROR);
+                               return;
+                       }
+                       goToState(TransitionState.COMPLETED);
+               }
+
+               public void awaitState(Set<TransitionState> states) throws 
InterruptedException {
+                       try {
+                               this.lock.lock();
+                               while (!states.contains(this.transitionResult)) 
{
+                                       this.condition.await();
+                               }
+                       } finally {
+                               this.lock.unlock();
+                       }
+               }
+
+               public void goToState(TransitionState state) {
+                       try {
+                               this.lock.lock();
+                               this.transitionResult = state;
+                               this.condition.signalAll();
+                       } finally {
+                               this.lock.unlock();
+                       }
+               }
+       }
+
+       enum TransitionState {
+               STARTING, TRANSITIONING, COMPLETED, INTERRUPTED, UNALLOWED, 
TIMEOUT, CALLBACK_ERROR
+       }
+
+       @RequiredArgsConstructor
+  @EqualsAndHashCode(of = "name")
+       public static class NamedStateWithCallback implements 
StateWithCallbacks<NamedStateWithCallback> {
+         @Getter
+         private final String name;
+         private final Function<NamedStateWithCallback, Void> enterCallback;
+         private final Function<NamedStateWithCallback, Void> leaveCallback;
+
+         String lastTransition = "";
+
+    public NamedStateWithCallback(String name) {
+      this(name, null, null);
+    }
+
+    private void setLastTransition(String callback, NamedStateWithCallback 
start, NamedStateWithCallback end) {
+      this.lastTransition = String.format("%s:%s->%s", callback, start == null 
? "null" : start.name, end.name);
+    }
+
+    @Override
+    public void onEnterState(@Nullable NamedStateWithCallback previousState) {
+      if (this.enterCallback == null) {
+        setLastTransition("enter", previousState, this);
+      } else {
+        this.enterCallback.apply(previousState);
+      }
+    }
+
+    @Override
+    public void onLeaveState(NamedStateWithCallback nextState) {
+      if (this.leaveCallback == null) {
+        setLastTransition("leave", this, nextState);
+      } else {
+        this.leaveCallback.apply(nextState);
+      }
+    }
+  }
+}
diff --git a/gradle/scripts/defaultBuildProperties.gradle 
b/gradle/scripts/defaultBuildProperties.gradle
index 8509cc6..e772acd 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -25,7 +25,7 @@ def BuildProperties BUILD_PROPERTIES = new 
BuildProperties(project)
     .register(new BuildProperty("nexusArtifactSnapshotRepository", 
"https://repository.apache.org/content/repositories/snapshots";, "Maven 
repository to publish artifacts"))
     .register(new BuildProperty("avroVersion", "1.8.1", "Avro dependencies 
version"))
     .register(new BuildProperty("awsVersion", "1.11.8", "AWS dependencies 
version"))
-    .register(new BuildProperty("bytemanVersion", "2.2.1", "Byteman 
dependencies version"))
+    .register(new BuildProperty("bytemanVersion", "4.0.5", "Byteman 
dependencies version"))
     .register(new BuildProperty("confluentVersion", "2.0.1", "confluent 
dependencies version"))
     .register(new BuildProperty("doNotSignArtifacts", false, "Do not sight 
Maven artifacts"))
     .register(new BuildProperty("gobblinFlavor", "standard", "Build flavor 
(see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index f7ff29c..0009397 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -121,8 +121,8 @@ ext.externalDependency = [
     "bytemanBmunit": "org.jboss.byteman:byteman-bmunit:" + bytemanVersion,
     "bcpgJdk15on": "org.bouncycastle:bcpg-jdk15on:1.52",
     "bcprovJdk15on": "org.bouncycastle:bcprov-jdk15on:1.52",
-    "calciteCore": "org.apache.calcite:calcite-core:1.3.0-incubating",
-    "calciteAvatica": "org.apache.calcite:calcite-avatica:1.3.0-incubating",
+    "calciteCore": "org.apache.calcite:calcite-core:1.16.0",
+    "calciteAvatica": "org.apache.calcite:calcite-avatica:1.13.0",
     "jhyde": "org.pentaho:pentaho-aggdesigner-algorithm:5.1.5-jhyde",
     "curatorFramework": "org.apache.curator:curator-framework:2.10.0",
     "curatorRecipes": "org.apache.curator:curator-recipes:2.10.0",
diff --git a/gradle/scripts/globalDependencies.gradle 
b/gradle/scripts/globalDependencies.gradle
index d64db67..270a121 100644
--- a/gradle/scripts/globalDependencies.gradle
+++ b/gradle/scripts/globalDependencies.gradle
@@ -43,6 +43,7 @@ subprojects {
         // Required to add JDK's tool jar, which is required to run byteman 
tests.
         testCompile (files(((URLClassLoader) 
ToolProvider.getSystemToolClassLoader()).getURLs()))
       }
+      all*.exclude group: 'org.apache.calcite', module: 'calcite-avatica' // 
replaced by org.apache.calcite.avatica:avatica-core
     }
   }
 }

Reply via email to