This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9e95bd3 [GOBBLIN-677] Allow early termination of Gobblin jobs based
on a predicate on the job progress
9e95bd3 is described below
commit 9e95bd32d6115175b532dbbd9f7e73bae8059c02
Author: ibuenros <[email protected]>
AuthorDate: Mon Mar 4 10:08:31 2019 -0800
[GOBBLIN-677] Allow early termination of Gobblin jobs based on a predicate
on the job progress
Closes #2548 from ibuenros/early-termination-2
---
.../gobblin/runtime/JobShutdownException.java | 27 ++
.../apache/gobblin/source/extractor/Extractor.java | 31 +-
.../runtime/embedded/EmbeddedGobblinDistcp.java | 2 +-
.../embedded/EmbeddedGobblinDistcpTest.java | 1 +
.../extractor/extract/kafka/KafkaExtractor.java | 14 +
gobblin-runtime/build.gradle | 5 +-
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 42 +-
.../java/org/apache/gobblin/runtime/JobState.java | 23 +-
.../main/java/org/apache/gobblin/runtime/Task.java | 8 +-
.../java/org/apache/gobblin/runtime/TaskState.java | 3 +-
.../gobblin/runtime/embedded/EmbeddedGobblin.java | 7 +-
.../runtime/job/GobblinJobFiniteStateMachine.java | 180 ++++++++
.../runtime/job/JobInterruptionPredicate.java | 111 +++++
.../apache/gobblin/runtime/job/JobProgress.java | 38 ++
.../apache/gobblin/runtime/job/TaskProgress.java | 33 ++
.../gobblin/runtime/local/LocalJobLauncher.java | 5 +
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 126 ++++--
.../spec_catalog/SpecCatalogListenersList.java | 34 --
.../gobblin/util/ReflectivePredicateEvaluator.java | 370 ++++++++++++++++
.../runtime/job/JobInterruptionPredicateTest.java | 133 ++++++
.../util/ReflectivePredicateEvaluatorTest.java | 125 ++++++
.../org/apache/gobblin/fsm/FiniteStateMachine.java | 463 +++++++++++++++++++++
.../org/apache/gobblin/fsm/StateWithCallbacks.java | 45 ++
.../apache/gobblin/fsm/FiniteStateMachineTest.java | 344 +++++++++++++++
gradle/scripts/defaultBuildProperties.gradle | 2 +-
gradle/scripts/dependencyDefinitions.gradle | 4 +-
gradle/scripts/globalDependencies.gradle | 1 +
27 files changed, 2095 insertions(+), 82 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
new file mode 100644
index 0000000..2c7d55a
--- /dev/null
+++
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/JobShutdownException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+/**
+ * An exception thrown when a job cannot be graciously shutdown.
+ */
+public class JobShutdownException extends Exception {
+ public JobShutdownException(String message) {
+ super(message);
+ }
+}
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
index 9749795..2ea900b 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/Extractor.java
@@ -23,8 +23,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.runtime.JobShutdownException;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
+import org.apache.gobblin.util.Decorator;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import io.reactivex.Emitter;
@@ -54,7 +56,7 @@ public interface Extractor<S, D> extends Closeable {
* @return schema of the extracted data records
* @throws java.io.IOException if there is problem getting the schema
*/
- public S getSchema() throws IOException;
+ S getSchema() throws IOException;
/**
* Read the next data record from the data source.
@@ -78,7 +80,7 @@ public interface Extractor<S, D> extends Closeable {
*
* @return the expected source record count
*/
- public long getExpectedRecordCount();
+ long getExpectedRecordCount();
/**
* Get the calculated high watermark up to which data records are to be
extracted.
@@ -88,7 +90,23 @@ public interface Extractor<S, D> extends Closeable {
* <a
href="https://github.com/linkedin/gobblin/wiki/Watermarks">Watermarks</a> for
more information.
*/
@Deprecated
- public long getHighWatermark();
+ long getHighWatermark();
+
+ /**
+ * Called to notify the Extractor it should shut down as soon as possible.
If this call returns successfully, the task
+ * will continue consuming records from the Extractor and continue execution
normally. The extractor should only emit
+ * those records necessary to stop at a graceful committable state. Most job
executors will eventually kill the task
+ * if the Extractor does not stop emitting records after a few seconds.
+ *
+ * @throws JobShutdownException if the extractor does not support early
termination. This will cause the task to fail.
+ */
+ default void shutdown() throws JobShutdownException {
+ if (this instanceof Decorator && ((Decorator) this).getDecoratedObject()
instanceof Extractor) {
+ ((Extractor) ((Decorator) this).getDecoratedObject()).shutdown();
+ } else {
+ throw new JobShutdownException(this.getClass().getName() + ": Extractor
does not support shutdown.");
+ }
+ }
/**
* Read an {@link RecordEnvelope}. By default, just wrap {@link
#readRecord(Object)} in a {@link RecordEnvelope}.
@@ -115,7 +133,12 @@ public interface Extractor<S, D> extends Closeable {
S schema = getSchema();
Flowable<StreamEntity<D>> recordStream = Flowable.generate(() ->
shutdownRequest, (BiConsumer<AtomicBoolean, Emitter<StreamEntity<D>>>) (state,
emitter) -> {
if (state.get()) {
- emitter.onComplete();
+ // shutdown requested
+ try {
+ shutdown();
+ } catch (JobShutdownException exc) {
+ emitter.onError(exc);
+ }
}
try {
StreamEntity<D> record = readStreamEntity();
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
index e52748f..f7076bd 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
@@ -82,7 +82,7 @@ public class EmbeddedGobblinDistcp extends EmbeddedGobblin {
this.setConfiguration(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
to.getFileSystem(new Configuration()).getUri().toString());
// add gobblin-data-management jar to distributed jars
- this.distributeJar(ClassUtil.findContainingJar(CopySource.class));
+ this.distributeJarByClassWithPriority(CopySource.class, 0);
}
/**
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 3b289c0..c12dea4 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index e54a4b6..0a4685f 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -23,7 +23,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.gobblin.runtime.JobShutdownException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +114,8 @@ public abstract class KafkaExtractor<S, D> extends
EventBasedExtractor<S, D> {
private long currentPartitionReadRecordTime = 0;
protected D currentPartitionLastSuccessfulRecord = null;
+ private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+
public KafkaExtractor(WorkUnitState state) {
super(state);
this.workUnitState = state;
@@ -164,6 +168,10 @@ public abstract class KafkaExtractor<S, D> extends
EventBasedExtractor<S, D> {
@SuppressWarnings("unchecked")
@Override
public D readRecordImpl(D reuse) throws DataRecordException, IOException {
+ if (this.shutdownRequested.get()) {
+ return null;
+ }
+
long readStartTime = System.nanoTime();
while (!allPartitionsFinished()) {
@@ -245,6 +253,12 @@ public abstract class KafkaExtractor<S, D> extends
EventBasedExtractor<S, D> {
return null;
}
+ @Override
+ public void shutdown()
+ throws JobShutdownException {
+ this.shutdownRequested.set(true);
+ }
+
private boolean allPartitionsFinished() {
return this.currentPartitionIdx != INITIAL_PARTITION_IDX &&
this.currentPartitionIdx >= this.highWatermark.size();
}
diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle
index 88e0c10..71dd510 100644
--- a/gobblin-runtime/build.gradle
+++ b/gobblin-runtime/build.gradle
@@ -49,6 +49,8 @@ dependencies {
compile externalDependency.avro
compile externalDependency.avroMapredH2
+ compile externalDependency.calciteCore
+ //compile externalDependency.calciteAvatica
compile externalDependency.commonsCli
compile externalDependency.commonsConfiguration
compile externalDependency.commonsEmail
@@ -83,8 +85,6 @@ dependencies {
compile externalDependency.kryo
testCompile project(path: ":gobblin-metastore", configuration:
"testFixtures")
- testCompile externalDependency.calciteCore
- testCompile externalDependency.calciteAvatica
testCompile externalDependency.jhyde
testCompile externalDependency.testng
testCompile externalDependency.hamcrest
@@ -150,6 +150,7 @@ jmh {
}
test {
+ systemProperty "org.jboss.byteman.verbose", "true"
workingDir rootProject.rootDir
maxParallelForks = 1
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index e800107..ccc94cb 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -28,8 +28,8 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +60,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import javax.annotation.Nullable;
+import lombok.Setter;
/**
@@ -93,6 +94,8 @@ public class GobblinMultiTaskAttempt {
private final Optional<String> containerIdOptional;
private final Optional<StateStore<TaskState>> taskStateStoreOptional;
private final SharedResourcesBroker<GobblinScopeTypes> jobBroker;
+ @Setter
+ private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = (gmta) ->
false;
private List<Task> tasks;
/**
@@ -139,16 +142,39 @@ public class GobblinMultiTaskAttempt {
this.tasks = runWorkUnits(countDownLatch);
log.info("Waiting for submitted tasks of job {} to complete in container
{}...", jobId,
containerIdOptional.or(""));
- while (countDownLatch.getCount() > 0) {
- log.info(String.format("%d out of %d tasks of job %s are running in
container %s", countDownLatch.getCount(),
- countDownLatch.getRegisteredParties(), jobId,
containerIdOptional.or("")));
- if (countDownLatch.await(10, TimeUnit.SECONDS)) {
- break;
+ try {
+ while (countDownLatch.getCount() > 0) {
+ if (this.interruptionPredicate.test(this)) {
+ log.info("Interrupting task execution due to satisfied predicate.");
+ interruptTaskExecution(countDownLatch);
+ break;
+ }
+ log.info(String.format("%d out of %d tasks of job %s are running in
container %s", countDownLatch.getCount(),
+ countDownLatch.getRegisteredParties(), jobId,
containerIdOptional.or("")));
+ if (countDownLatch.await(10, TimeUnit.SECONDS)) {
+ break;
+ }
}
+ } catch (InterruptedException interrupt) {
+ log.info("Job interrupted by InterrupedException.");
+ interruptTaskExecution(countDownLatch);
}
log.info("All assigned tasks of job {} have completed in container {}",
jobId, containerIdOptional.or(""));
}
+ private void interruptTaskExecution(CountDownLatch countDownLatch) throws
InterruptedException {
+ log.info("Job interrupted. Attempting a graceful shutdown of the job.");
+ this.tasks.forEach(Task::shutdown);
+ if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
+ log.warn("Graceful shutdown of job timed out. Killing all outstanding
tasks.");
+ try {
+ this.taskExecutor.shutDown();
+ } catch (Throwable t) {
+ throw new RuntimeException("Failed to shutdown task executor.", t);
+ }
+ }
+ }
+
/**
* Commit {@link #tasks} by 1. calling {@link Task#commit()} in parallel; 2.
executing any additional {@link CommitStep};
* 3. persist task statestore.
@@ -468,7 +494,8 @@ public class GobblinMultiTaskAttempt {
public static GobblinMultiTaskAttempt runWorkUnits(String jobId, String
containerId, JobState jobState,
List<WorkUnit> workUnits, TaskStateTracker taskStateTracker,
TaskExecutor taskExecutor,
StateStore<TaskState> taskStateStore,
- CommitPolicy multiTaskAttemptCommitPolicy,
SharedResourcesBroker<GobblinScopeTypes> jobBroker)
+ CommitPolicy multiTaskAttemptCommitPolicy,
SharedResourcesBroker<GobblinScopeTypes> jobBroker,
+ Predicate<GobblinMultiTaskAttempt> interruptionPredicate)
throws IOException, InterruptedException {
// dump the work unit if tracking logs are enabled
@@ -480,6 +507,7 @@ public class GobblinMultiTaskAttempt {
GobblinMultiTaskAttempt multiTaskAttempt =
new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState,
taskStateTracker, taskExecutor,
Optional.of(containerId), Optional.of(taskStateStore), jobBroker);
+ multiTaskAttempt.setInterruptionPredicate(interruptionPredicate);
multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
return multiTaskAttempt;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index c0f9271..45d3876 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Properties;
import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.job.JobProgress;
+import org.apache.gobblin.runtime.job.TaskProgress;
import org.apache.hadoop.io.Text;
import com.codahale.metrics.Counter;
@@ -67,7 +69,7 @@ import org.apache.gobblin.util.ImmutableProperties;
*
* @author Yinan Li
*/
-public class JobState extends SourceState {
+public class JobState extends SourceState implements JobProgress {
/**
* An enumeration of possible job states, which are identical to
@@ -248,6 +250,20 @@ public class JobState extends SourceState {
}
/**
+ * Get the currently elapsed time for this job.
+ * @return
+ */
+ public long getElapsedTime() {
+ if (this.endTime > 0) {
+ return this.endTime - this.startTime;
+ }
+ if (this.startTime > 0) {
+ return System.currentTimeMillis() - this.startTime;
+ }
+ return 0;
+ }
+
+ /**
* Set job end time.
*
* @param endTime job end time
@@ -393,6 +409,11 @@ public class JobState extends SourceState {
return
ImmutableList.<TaskState>builder().addAll(this.taskStates.values()).build();
}
+ @Override
+ public List<TaskState> getTaskProgress() {
+ return getTaskStates();
+ }
+
/**
* Create a {@link Map} from dataset URNs (as being specified by {@link
ConfigurationKeys#DATASET_URN_KEY} to
* {@link DatasetState} objects that represent the dataset states and store
{@link TaskState}s corresponding
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 049a3ff..8ddd1d4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -462,7 +462,7 @@ public class Task implements TaskIFace {
RecordEnvelope recordEnvelope;
// Extract, convert, and fork one source record at a time.
- while (!shutdownRequested() && (recordEnvelope =
extractor.readRecordEnvelope()) != null) {
+ while ((recordEnvelope = extractor.readRecordEnvelope()) != null) {
onRecordExtract();
AcknowledgableWatermark ackableWatermark = new
AcknowledgableWatermark(recordEnvelope.getWatermark());
if (watermarkTracker.isPresent()) {
@@ -473,6 +473,9 @@ public class Task implements TaskIFace {
ackableWatermark.incrementAck());
}
ackableWatermark.ack();
+ if (shutdownRequested()) {
+ extractor.shutdown();
+ }
}
} else {
RecordEnvelope record;
@@ -495,6 +498,9 @@ public class Task implements TaskIFace {
throw new RuntimeException(e);
}
}
+ if (shutdownRequested()) {
+ extractor.shutdown();
+ }
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index a9d99be..50bba37 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
+import org.apache.gobblin.runtime.job.TaskProgress;
import org.apache.hadoop.io.Text;
import com.codahale.metrics.Counter;
@@ -59,7 +60,7 @@ import lombok.Getter;
*
* @author Yinan Li
*/
-public class TaskState extends WorkUnitState {
+public class TaskState extends WorkUnitState implements TaskProgress {
// Built-in metric names
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index b6cc3b5..dced320 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -200,7 +200,12 @@ public class EmbeddedGobblin {
* will appear first in the classpath. Default priority is 0.
*/
public EmbeddedGobblin distributeJarByClassWithPriority(Class<?> klazz, int
priority) {
- return distributeJarWithPriority(ClassUtil.findContainingJar(klazz),
priority);
+ String jar = ClassUtil.findContainingJar(klazz);
+ if (jar == null) {
+ log.warn(String.format("Could not find jar for class %s. This is normal
in test runs.", klazz));
+ return this;
+ }
+ return distributeJarWithPriority(jar, priority);
}
/**
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
new file mode 100644
index 0000000..d36f68c
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/GobblinJobFiniteStateMachine.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.io.IOException;
+
+import org.apache.gobblin.fsm.FiniteStateMachine;
+import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.runtime.JobState;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link FiniteStateMachine} implementation to track the state of a Gobblin
job executor.
+ */
+@Slf4j
+public class GobblinJobFiniteStateMachine extends
FiniteStateMachine<GobblinJobFiniteStateMachine.JobFSMState> {
+
+ /**
+ * Types of state the job can be in.
+ */
+ public enum StateType {
+ PREPARING, RUNNING, INTERRUPTED, CANCELLED, SUCCESS, FAILED
+ }
+
+ /**
+ * State of a job.
+ */
+ @AllArgsConstructor(access = AccessLevel.PRIVATE)
+ @EqualsAndHashCode(of = "stateType")
+ @ToString
+ @Getter
+ public static class JobFSMState {
+ private final StateType stateType;
+ }
+
+ /**
+ * A special {@link JobFSMState} that is aware of how to interrupt a
running job.
+ */
+ private class RunnableState extends JobFSMState implements
StateWithCallbacks<JobFSMState> {
+ private final JobInterruptionPredicate jobInterruptionPredicate;
+
+ public RunnableState() {
+ super(StateType.RUNNING);
+ if
(GobblinJobFiniteStateMachine.this.interruptGracefully == null) {
+ this.jobInterruptionPredicate = null;
+ } else {
+ this.jobInterruptionPredicate = new
JobInterruptionPredicate(GobblinJobFiniteStateMachine.this.jobState,
+
GobblinJobFiniteStateMachine.this::interruptRunningJob, false);
+ }
+ }
+
+ @Override
+ public void onEnterState(@Nullable JobFSMState previousState) {
+ if (this.jobInterruptionPredicate != null) {
+ this.jobInterruptionPredicate.startAsync();
+ }
+ }
+
+ @Override
+ public void onLeaveState(JobFSMState nextState) {
+ if (this.jobInterruptionPredicate != null) {
+ this.jobInterruptionPredicate.stopAsync();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
+
+ /**
+ * A runnable that allows for {@link IOException}s.
+ */
+ @FunctionalInterface
+ public interface RunnableWithIoException {
+ void run() throws IOException;
+ }
+
+ private final JobState jobState;
+ private final RunnableWithIoException interruptGracefully;
+ private final RunnableWithIoException killJob;
+
+ @lombok.Builder
+ private GobblinJobFiniteStateMachine(JobState jobState,
RunnableWithIoException interruptGracefully,
+ RunnableWithIoException killJob) {
+ super(buildAllowedTransitions(), Sets.newHashSet(new
JobFSMState(StateType.CANCELLED)), new JobFSMState(StateType.FAILED),
+ new JobFSMState(StateType.PREPARING));
+
+ if (jobState == null) {
+ throw new IllegalArgumentException("Job state is
required.");
+ }
+
+ this.jobState = jobState;
+ this.interruptGracefully = interruptGracefully;
+ this.killJob = killJob;
+ }
+
+ /**
+ * Callers should use this method to obtain the {@link JobFSMState} for
a particular {@link StateType}, as the
+ * {@link JobFSMState} might contain additional functionality like
running other services, etc.
+ * @param stateType
+ * @return
+ */
+ public JobFSMState getEndStateForType(StateType stateType) {
+ switch (stateType) {
+ case RUNNING:
+ return new RunnableState();
+ default:
+ return new JobFSMState(stateType);
+ }
+ }
+
+ private void interruptRunningJob() {
+ log.info("Interrupting job execution.");
+ try (FiniteStateMachine<JobFSMState>.Transition transition =
startTransition(getEndStateForType(StateType.INTERRUPTED))) {
+ try {
+ this.interruptGracefully.run();
+ } catch (IOException ioe) {
+
transition.changeEndState(getEndStateForType(StateType.FAILED));
+ }
+ } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+ log.error("Cannot interrupt job.", exc);
+ } catch (InterruptedException |
FailedTransitionCallbackException exc) {
+ log.error("Cannot finish graceful job interruption.
Killing job.", exc);
+ try {
+ this.killJob.run();
+ } catch (IOException ioe) {
+ log.error("Failed to kill job.", ioe);
+ }
+ if (exc instanceof FailedTransitionCallbackException) {
+ ((FailedTransitionCallbackException)
exc).getTransition().switchEndStateToErrorState();
+ ((FailedTransitionCallbackException)
exc).getTransition().closeWithoutCallbacks();
+ }
+ }
+ }
+
+ private static SetMultimap<JobFSMState, JobFSMState>
buildAllowedTransitions() {
+ SetMultimap<JobFSMState, JobFSMState> transitions =
HashMultimap.create();
+ transitions.put(new JobFSMState(StateType.PREPARING), new
JobFSMState(StateType.RUNNING));
+ transitions.put(new JobFSMState(StateType.PREPARING), new
JobFSMState(StateType.FAILED));
+ transitions.put(new JobFSMState(StateType.PREPARING), new
JobFSMState(StateType.INTERRUPTED));
+ transitions.put(new JobFSMState(StateType.RUNNING), new
JobFSMState(StateType.SUCCESS));
+ transitions.put(new JobFSMState(StateType.RUNNING), new
JobFSMState(StateType.FAILED));
+ transitions.put(new JobFSMState(StateType.RUNNING), new
JobFSMState(StateType.INTERRUPTED));
+ return transitions;
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
new file mode 100644
index 0000000..6f67657
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobInterruptionPredicate.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.util.ReflectivePredicateEvaluator;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * This class evaluates a predicate on the {@link JobProgress} of a job and
calls a job interruption hook when the
+ * predicate is satisfied.
+ *
+ * It is used to preemptively stop jobs after they satisfy some completion
predicate (e.g. more than 15 minutes have
+ * elapsed and at least 75% of tasks have finished).
+ */
+@Slf4j
+public class JobInterruptionPredicate extends AbstractScheduledService {
+
+ public static final String INTERRUPTION_SQL =
"org.apache.gobblin.jobInterruptionPredicate.sql";
+
+ private final String sql;
+ private final ReflectivePredicateEvaluator evaluator;
+ private final JobProgress jobProgress;
+ private final Runnable jobInterruptionHook;
+
+ public JobInterruptionPredicate(JobState jobState, Runnable
jobInterruptionHook, boolean autoStart) {
+ this(jobState, jobState.getProp(INTERRUPTION_SQL),
jobInterruptionHook, autoStart);
+ }
+
+ protected JobInterruptionPredicate(JobProgress jobProgress, String
predicate,
+ Runnable jobInterruptionHook, boolean autoStart) {
+ this.sql = predicate;
+
+ ReflectivePredicateEvaluator tmpEval = null;
+ if (this.sql != null) {
+ try {
+ tmpEval = new
ReflectivePredicateEvaluator(this.sql, JobProgress.class, TaskProgress.class);
+ } catch (SQLException exc) {
+ log.warn("Job interruption predicate is
invalid, will not preemptively interrupt job.", exc);
+ }
+ }
+ this.evaluator = tmpEval;
+ this.jobProgress = jobProgress;
+ this.jobInterruptionHook = jobInterruptionHook;
+
+ if (autoStart && this.sql != null) {
+ startAsync();
+ }
+ }
+
+ @Override
+ protected void runOneIteration() {
+ if (this.evaluator == null) {
+ stopAsync();
+ return;
+ }
+ switch (this.jobProgress.getState()) {
+ case PENDING:
+ return;
+ case RUNNING:
+ try {
+ List<Object> objects =
Stream.concat(Stream.<Object>of(this.jobProgress),
this.jobProgress.getTaskProgress().stream()).collect(
+ Collectors.toList());
+ if (this.evaluator.evaluate(objects)) {
+ log.info("Interrupting job due
to satisfied job interruption predicate. Predicate: " + this.sql);
+ this.jobInterruptionHook.run();
+ stopAsync();
+ }
+ } catch (Throwable exc) {
+ log.warn("Failed to evaluate job
interruption predicate. Will not preemptively interrupt job.", exc);
+ throw Throwables.propagate(exc);
+ }
+ break;
+ default:
+ log.info(String.format("Detected job finished
with state %s. Stopping job interruption predicate.",
+ this.jobProgress.getState()));
+ stopAsync();
+ }
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedDelaySchedule(30, 30,
TimeUnit.SECONDS);
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
new file mode 100644
index 0000000..ad1308a
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/JobProgress.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.util.List;
+
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * Type used to retrieve the progress of a Gobblin job.
+ */
+public interface JobProgress {
+
+ String getJobId();
+ int getTaskCount();
+ int getCompletedTasks();
+ long getElapsedTime();
+ JobState.RunningState getState();
+
+ List<? extends TaskProgress> getTaskProgress();
+
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
new file mode 100644
index 0000000..1ec8d61
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job/TaskProgress.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+/**
+ * Interface used to retrieve the progress of a task in a Gobblin job.
+ */
+public interface TaskProgress {
+
+ String getJobId();
+ String getTaskId();
+ WorkUnitState.WorkingState getWorkingState();
+ boolean isCompleted();
+
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
index fc0e197..6b6bc16 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
@@ -24,6 +24,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.runtime.job.JobInterruptionPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,8 +144,12 @@ public class LocalJobLauncher extends AbstractJobLauncher {
}
});
+ Thread thisThread = Thread.currentThread();
+ JobInterruptionPredicate jobInterruptionPredicate =
+ new JobInterruptionPredicate(jobState, () -> thisThread.interrupt(),
true);
GobblinMultiTaskAttempt.runWorkUnits(this.jobContext,
workUnitsWithJobState, this.taskStateTracker,
this.taskExecutor, GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+ jobInterruptionPredicate.stopAsync();
if (this.cancellationRequested) {
// Wait for the cancellation execution if it has been requested
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 1f53054..529fe11 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -27,6 +27,9 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.fsm.FiniteStateMachine;
+import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -82,6 +85,8 @@ import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.JobFSMState;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.StateType;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.runtime.util.MetricGroup;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -110,6 +115,9 @@ import org.apache.gobblin.util.SerializationUtils;
*/
public class MRJobLauncher extends AbstractJobLauncher {
+ private static final String INTERRUPT_JOB_FILE_NAME = "_INTERRUPT_JOB";
+ private static final String GOBBLIN_JOB_INTERRUPT_PATH_KEY =
"gobblin.jobInterruptPath";
+
private static final Logger LOG =
LoggerFactory.getLogger(MRJobLauncher.class);
private static final String JOB_NAME_PREFIX = "Gobblin-";
@@ -148,6 +156,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
private final StateStore<TaskState> taskStateStore;
private final int jarFileMaximumRetry;
+ private final Path interruptPath;
+ private final GobblinJobFiniteStateMachine fsm;
public MRJobLauncher(Properties jobProps) throws Exception {
this(jobProps, null);
@@ -170,6 +180,9 @@ public class MRJobLauncher extends AbstractJobLauncher {
throws Exception {
super(jobProps, metadataTags);
+ this.fsm =
GobblinJobFiniteStateMachine.builder().jobState(jobContext.getJobState())
+
.interruptGracefully(this::interruptGracefully).killJob(this::killJob).build();
+
this.conf = conf;
// Put job configuration properties into the Hadoop configuration so they
are available in the mappers
JobConfigurationUtils.putPropertiesIntoConfiguration(this.jobProps,
this.conf);
@@ -185,6 +198,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
this.mrJobDir = new Path(
new
Path(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
this.jobContext.getJobName()),
this.jobContext.getJobId());
+ this.interruptPath = new Path(this.mrJobDir, INTERRUPT_JOB_FILE_NAME);
if (this.fs.exists(this.mrJobDir)) {
LOG.warn("Job working directory already exists for job " +
this.jobContext.getJobName());
this.fs.delete(this.mrJobDir, true);
@@ -252,26 +266,35 @@ public class MRJobLauncher extends AbstractJobLauncher {
this.taskStateCollectorService.startAsync().awaitRunning();
LOG.info("Launching Hadoop MR job " + this.job.getJobName());
- this.job.submit();
- this.hadoopJobSubmitted = true;
+ try (FiniteStateMachine<JobFSMState>.Transition t =
this.fsm.startTransition(this.fsm.getEndStateForType(StateType.RUNNING))) {
+ try {
+ this.job.submit();
+ } catch (Throwable exc) {
+ t.changeEndState(this.fsm.getEndStateForType(StateType.FAILED));
+ throw exc;
+ }
+ this.hadoopJobSubmitted = true;
- // Set job tracking URL to the Hadoop job tracking URL if it is not set
yet
- if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
- jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY,
this.job.getTrackingURL());
+ // Set job tracking URL to the Hadoop job tracking URL if it is not
set yet
+ if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
+ jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY,
this.job.getTrackingURL());
+ }
+ } catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
+ LOG.error("Cannot start MR job.", unallowed);
}
- TimingEvent mrJobRunTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
- LOG.info(String.format("Waiting for Hadoop MR job %s to complete",
this.job.getJobID()));
- this.job.waitForCompletion(true);
- mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId",
this.job.getJobID().toString()));
+ if (this.fsm.getCurrentState().getStateType().equals(StateType.RUNNING))
{
+ TimingEvent mrJobRunTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
+ LOG.info(String.format("Waiting for Hadoop MR job %s to complete",
this.job.getJobID()));
- if (this.cancellationRequested) {
- // Wait for the cancellation execution if it has been requested
- synchronized (this.cancellationExecution) {
- if (this.cancellationExecuted) {
- return;
- }
- }
+ this.job.waitForCompletion(true);
+
this.fsm.transitionIfAllowed(fsm.getEndStateForType(StateType.SUCCESS));
+
+ mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId",
this.job.getJobID().toString()));
+ }
+
+ if
(this.fsm.getCurrentState().getStateType().equals(StateType.CANCELLED)) {
+ return;
}
// Create a metrics set for this job run from the Hadoop counters.
@@ -286,18 +309,52 @@ public class MRJobLauncher extends AbstractJobLauncher {
@Override
protected void executeCancellation() {
- try {
- if (this.hadoopJobSubmitted && !this.job.isComplete()) {
- LOG.info("Killing the Hadoop MR job for job " +
this.jobContext.getJobId());
- this.job.killJob();
- // Collect final task states.
- this.taskStateCollectorService.stopAsync().awaitTerminated();
+ try (FiniteStateMachine<JobFSMState>.Transition transition =
+
this.fsm.startTransition(this.fsm.getEndStateForType(StateType.CANCELLED))) {
+ if (transition.getStartState().getStateType().equals(StateType.RUNNING))
{
+ try {
+ killJob();
+ } catch (IOException ioe) {
+ LOG.error("Failed to kill the Hadoop MR job for job " +
this.jobContext.getJobId());
+
transition.changeEndState(this.fsm.getEndStateForType(StateType.FAILED));
+ }
+ }
+ } catch (GobblinJobFiniteStateMachine.FailedTransitionCallbackException
exc) {
+ exc.getTransition().switchEndStateToErrorState();
+ exc.getTransition().closeWithoutCallbacks();
+ } catch (FiniteStateMachine.UnallowedTransitionException |
InterruptedException exc) {
+ LOG.error("Failed to cancel job " + this.jobContext.getJobId(), exc);
+ }
+ }
+
+ /**
+ * Attempt a gracious interruption of the running job
+ */
+ private void interruptGracefully() throws IOException {
+ LOG.info("Attempting graceful interruption of job " +
this.jobContext.getJobId());
+
+ this.fs.createNewFile(this.interruptPath);
+
+ long waitTimeStart = System.currentTimeMillis();
+ while (!this.job.isComplete() && System.currentTimeMillis() <
waitTimeStart + 30 * 1000) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ break;
}
- } catch (IllegalStateException ise) {
- LOG.error("The Hadoop MR job has not started for job " +
this.jobContext.getJobId());
- } catch (IOException ioe) {
- LOG.error("Failed to kill the Hadoop MR job for job " +
this.jobContext.getJobId());
}
+
+ if (!this.job.isComplete()) {
+ LOG.info("Interrupted job did not shut itself down after timeout.
Killing job.");
+ this.job.killJob();
+ }
+ }
+
+ private void killJob() throws IOException {
+ LOG.info("Killing the Hadoop MR job for job " +
this.jobContext.getJobId());
+ this.job.killJob();
+ // Collect final task states.
+ this.taskStateCollectorService.stopAsync().awaitTerminated();
}
/**
@@ -381,6 +438,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
Integer.parseInt(this.jobProps.getProperty(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)));
}
+ this.job.getConfiguration().set(GOBBLIN_JOB_INTERRUPT_PATH_KEY,
this.interruptPath.toString());
+
mrJobSetupTimer.stop();
}
@@ -679,6 +738,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
@Override
public void run(Context context) throws IOException, InterruptedException {
this.setup(context);
+
+ Path interruptPath = new
Path(context.getConfiguration().get(GOBBLIN_JOB_INTERRUPT_PATH_KEY));
+ if (this.fs.exists(interruptPath)) {
+ LOG.info(String.format("Found interrupt path %s indicating the driver
has interrupted the job, aborting mapper.", interruptPath));
+ return;
+ }
+
GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null;
try {
// De-serialize and collect the list of WorkUnits to run
@@ -701,7 +767,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
gobblinMultiTaskAttempt =
GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(),
context.getTaskAttemptID().toString(),
this.jobState, this.workUnits, this.taskStateTracker,
this.taskExecutor, this.taskStateStore,
- multiTaskAttemptCommitPolicy, jobBroker);
+ multiTaskAttemptCommitPolicy, jobBroker, (gmta) -> {
+ try {
+ return this.fs.exists(interruptPath);
+ } catch (IOException ioe) {
+ return false;
+ }
+ });
if (this.isSpeculativeEnabled) {
LOG.info("will not commit in task attempt");
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
index f2cd04b..9a798c1 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
@@ -15,40 +15,6 @@
* limitations under the License.
*/
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.gobblin.runtime.spec_catalog;
import java.io.Closeable;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
new file mode 100644
index 0000000..ead087d
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/util/ReflectivePredicateEvaluator.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ProjectableFilterableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.AbstractTable;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+
+import lombok.Data;
+
+
+/**
+ * An predicate evaluator that uses an interface to define a table schema and
can evaluate SQL statements on instances of
+ * that interface. See {@link ReflectivePredicateEvaluatorTest} for examples.
+ *
+ * Note all evaluated statements must return a single row with a single
boolean column.
+ *
+ * Usage:
+ * ReflectivePredicateEvaluator<MyInterface> evaluator = new
ReflectivePredicateEvaluator<>(MyInterface.class, "SELECT ... FROM
myInterface");
+ * evaluator.evaluate(instance1, instance2, ...); // use the statement
provided in constructor
+ * -- or --
+ * evaluator.evaluate("SELECT ... FROM myInterface", instance1, instance2,
...);
+ */
+public class ReflectivePredicateEvaluator implements Closeable {
+ private static final String REFERENCE_INTERFACES = "refInterface";
+ private static final String OPERATOR_ID = "operatorId";
+ private static final Pattern FIELD_NAME_EXTRACTOR =
Pattern.compile("(?:get([A-Z]))?(.+)");
+
+ private static final String MODEL_PATTERN = "{"
+ + "version: 1, defaultSchema: 'MAIN',"
+ + "schemas: ["
+ + "{name: 'MAIN', type: 'custom', factory: '%s',
operand: {%s: '%s', %s: '%d'}}"
+ + "]}";
+ private static final String CONNECT_STRING_PATTERN =
"jdbc:calcite:model=inline:%s";
+
+ private static final Cache<Integer, ReflectivePredicateEvaluator>
REGISTRY = CacheBuilder.newBuilder().weakValues().build();
+ private static final AtomicInteger IDENTIFIER = new AtomicInteger();
+
+ private final List<Class<?>> referenceInterfaces;
+ private final int identifier;
+ private final Connection conn;
+ private final PreparedStatement stmnt;
+
+ private final String sql;
+
+ private volatile List<Object> objects;
+
+ /**
+ * @param sql The default SQL expression to run in this evaluator.
+ * @param referenceInterfaces The interface that will be used to
generate the table schema.
+ * @throws SQLException
+ */
+ public ReflectivePredicateEvaluator(String sql, Class<?>...
referenceInterfaces) throws SQLException {
+ this.referenceInterfaces =
Lists.newArrayList(referenceInterfaces);
+ this.sql = sql;
+
+ this.identifier = IDENTIFIER.getAndIncrement();
+ REGISTRY.put(this.identifier, this);
+
+ String model = computeModel();
+ String connectString = String.format(CONNECT_STRING_PATTERN,
model);
+
+ this.conn =
+ DriverManager.getConnection(connectString);
+ this.stmnt = prepareStatement(sql);
+ }
+
+ private PreparedStatement prepareStatement(String sql) throws
SQLException {
+ PreparedStatement stmnt = null;
+ try {
+ stmnt = this.conn.prepareStatement(sql);
+ validateSql(stmnt, sql);
+ return stmnt;
+ } catch (Throwable t) {
+ if (stmnt != null) {
+ stmnt.close();
+ }
+ throw t;
+ }
+ }
+
+ private String computeModel() {
+ return String.format(MODEL_PATTERN,
PESchemaFactory.class.getName(), REFERENCE_INTERFACES,
+
Joiner.on(",").join(this.referenceInterfaces.stream().map(Class::getName).collect(Collectors.toList())),
+ OPERATOR_ID, this.identifier);
+ }
+
+ private void validateSql(PreparedStatement stmnt, String sql) throws
SQLException {
+ ResultSetMetaData metaData = stmnt.getMetaData();
+
+ if (metaData.getColumnCount() != 1 || metaData.getColumnType(1)
!= Types.BOOLEAN) {
+ throw new IllegalArgumentException("Statement is
expected to return a single boolean column. Provided statement: " + sql);
+ }
+ }
+
+ /**
+ * Evaluate the default predicate on the list of provided objects.
+ * @throws SQLException
+ */
+ public boolean evaluate(Object... objects) throws SQLException{
+ return evaluate(Lists.newArrayList(objects), null);
+ }
+
+ /**
+ * Evaluate an ad-hoc predicate on the list of provided objects.
+ * Note {@link #evaluate(Object...)} is preferable as it only does
validation of the expression once.
+ * @throws SQLException
+ */
+ public boolean evaluate(String sql, Object... objects) throws
SQLException{
+ return evaluate(Lists.newArrayList(objects), sql);
+ }
+
+ /**
+ * Evaluate the default predicate on the list of provided objects.
+ * @throws SQLException
+ */
+ public boolean evaluate(List<Object> objects) throws SQLException {
+ return evaluate(objects, null);
+ }
+
+ /**
+ * Evaluate an ad-hoc predicate on the list of provided objects.
+ * Note {@link #evaluate(Object[])} is preferable as it only does
validation of the expression once.
+ * @throws SQLException
+ */
+ public boolean evaluate(List<Object> objects, String sql) throws
SQLException {
+ synchronized (this) {
+ String actualSql = sql == null ? this.sql : sql;
+ PreparedStatement actualStmnt = null;
+ try {
+ actualStmnt = sql == null ? this.stmnt :
prepareStatement(sql);
+
+ this.objects = objects;
+ actualStmnt.execute();
+ ResultSet rs = actualStmnt.getResultSet();
+ if (!rs.next()) {
+ throw new
IllegalArgumentException("Expected at least one returned row. SQL evaluated: "
+ actualSql);
+ }
+ boolean result = true;
+ do {
+ result &= rs.getBoolean(1);
+ } while (rs.next());
+ return result;
+ } finally {
+ if (sql != null && actualStmnt != null) {
+ actualStmnt.close();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ try {
+ if (this.stmnt != null) {
+ this.stmnt.close();
+ }
+ if (this.conn != null) {
+ this.conn.close();
+ }
+ } catch (SQLException exc) {
+ throw new IOException("Failed to close " +
ReflectivePredicateEvaluator.class.getSimpleName(), exc);
+ }
+ }
+
+ /**
+ * Calcite {@link SchemaFactory} used for the evaluator.
+ * This class is public because Calcite uses reflection to instantiate
it, there is no reason to use it anywhere else
+ * in Gobblin.
+ */
+ public static class PESchemaFactory implements SchemaFactory {
+
+ @Override
+ public Schema create(SchemaPlus parentSchema, String name,
Map<String, Object> operand) {
+ try {
+ List<Class<?>> referenceInterfaces = new
ArrayList<>();
+ for (String iface :
Splitter.on(",").splitToList(operand.get(REFERENCE_INTERFACES).toString())) {
+
referenceInterfaces.add(Class.forName(iface));
+ }
+ int operatorIdentifier =
Integer.parseInt(operand.get(OPERATOR_ID).toString());
+
+ return new AbstractSchema() {
+ @Override
+ protected Map<String, Table>
getTableMap() {
+ HashMap<String, Table> map =
new HashMap<>();
+ for (Class<?> iface :
referenceInterfaces) {
+
map.put(iface.getSimpleName().toUpperCase(),
+ new
PETable(iface, operatorIdentifier));
+ }
+ return map;
+ }
+ };
+ } catch (ReflectiveOperationException roe) {
+ throw new RuntimeException(roe);
+ }
+ }
+ }
+
+ @Data
+ private static class PETable extends AbstractTable implements
ProjectableFilterableTable {
+ private final Class<?> referenceInterface;
+ private final int operatorIdentifier;
+ private volatile boolean initialized = false;
+
+ private RelDataType rowType;
+ private List<Function<Object, Object>> methodsForFields = new
ArrayList<>();
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext root,
List<RexNode> filters, int[] projects) {
+ List<Object> list =
REGISTRY.getIfPresent(this.operatorIdentifier).objects;
+
+ final int[] actualProjects = resolveProjects(projects);
+
+ Enumerator<Object[]> enumerator =
Linq4j.enumerator(list.stream()
+ .filter(o ->
referenceInterface.isAssignableFrom(o.getClass()))
+ .map(
+ m -> {
+ Object[] res = new
Object[actualProjects.length];
+ for (int i = 0; i <
actualProjects.length; i++) {
+ res[i] =
methodsForFields.get(actualProjects[i]).apply(m);
+ }
+ return res;
+ }
+ ).collect(Collectors.toList()));
+
+ return new AbstractEnumerable<Object[]>() {
+ @Override
+ public Enumerator<Object[]> enumerator() {
+ return enumerator;
+ }
+ };
+ }
+
+ private int[] resolveProjects(int[] projects) {
+ if (projects == null) {
+ projects = new int[methodsForFields.size()];
+ for (int i = 0; i < projects.length; i++) {
+ projects[i] = i;
+ }
+ }
+ return projects;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ initialize((JavaTypeFactory) typeFactory);
+ return this.rowType;
+ }
+
+ private synchronized void initialize(JavaTypeFactory
typeFactory) {
+ if (this.initialized) {
+ return;
+ }
+
+ this.methodsForFields = new ArrayList<>();
+ List<RelDataTypeField> fields = new ArrayList<>();
+
+ for (Method method :
this.referenceInterface.getMethods()) {
+ if (method.getParameterCount() == 0) {
+ String fieldName =
computeFieldName(method.getName());
+ if (fieldName != null) {
+
this.methodsForFields.add(extractorForMethod(method));
+ Class<?> retType =
method.getReturnType();
+ if (retType.isEnum()) {
+ retType = String.class;
+ }
+ fields.add(new
RelDataTypeFieldImpl(fieldName.toUpperCase(), fields.size(),
typeFactory.createType(retType)));
+ }
+ }
+ }
+
+ this.rowType = new MyDataType(fields,
referenceInterface);
+ this.initialized = true;
+ }
+
+ private Function<Object, Object> extractorForMethod(Method
method) {
+ return o -> {
+ try {
+ Object ret = method.invoke(o);
+ return method.getReturnType().isEnum()
? ret.toString() : ret;
+ } catch (ReflectiveOperationException roe) {
+ throw new RuntimeException(roe);
+ }
+ };
+ }
+
+ }
+
+ private static class MyDataType extends RelDataTypeImpl {
+ private final String typeString;
+
+ public MyDataType(List<? extends RelDataTypeField> fieldList,
Class<?> refInterface) {
+ super(fieldList);
+ this.typeString = refInterface.getName();
+ computeDigest();
+ }
+
+ @Override
+ protected void generateTypeString(StringBuilder sb, boolean
withDetail) {
+ sb.append(typeString);
+ }
+ }
+
+ private static String computeFieldName(String methodName) {
+ Matcher matcher = FIELD_NAME_EXTRACTOR.matcher(methodName);
+ if (matcher.matches()) {
+ return matcher.group(1) + matcher.group(2);
+ }
+ return null;
+ }
+
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
new file mode 100644
index 0000000..21625a8
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job/JobInterruptionPredicateTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.job;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.JobState;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+public class JobInterruptionPredicateTest {
+
+ @Test
+ public void testJobPredicate() {
+ SettableJobProgress jobProgress = new
SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING, new
ArrayList<>());
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+ JobInterruptionPredicate predicate =
+ new JobInterruptionPredicate(jobProgress,
"SELECT completedTasks > 5 FROM jobProgress", () -> atomicBoolean.set(true),
false);
+
+ predicate.runOneIteration();
+ Assert.assertFalse(atomicBoolean.get());
+
+ jobProgress.completedTasks = 6;
+
+ predicate.runOneIteration();
+ Assert.assertTrue(atomicBoolean.get());
+ }
+
+ @Test
+ public void testTaskPredicate() {
+ SettableTaskProgress t1 = new SettableTaskProgress("j1", "t1",
WorkUnitState.WorkingState.RUNNING, false);
+ SettableTaskProgress t2 = new SettableTaskProgress("j1", "t1",
WorkUnitState.WorkingState.RUNNING, false);
+
+ SettableJobProgress jobProgress = new
SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING,
+ Lists.newArrayList(t1, t2));
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+ JobInterruptionPredicate predicate =
+ new JobInterruptionPredicate(jobProgress,
"SELECT count(*) > 0 FROM taskProgress WHERE workingState = 'FAILED'", () ->
atomicBoolean.set(true), false);
+
+ predicate.runOneIteration();
+ Assert.assertFalse(atomicBoolean.get());
+
+ t2.workingState = WorkUnitState.WorkingState.FAILED;
+
+ predicate.runOneIteration();
+ Assert.assertTrue(atomicBoolean.get());
+ }
+
+ @Test
+ public void testTaskAndJobPredicate() {
+ SettableTaskProgress t1 = new SettableTaskProgress("j1", "t1",
WorkUnitState.WorkingState.RUNNING, false);
+ SettableTaskProgress t2 = new SettableTaskProgress("j1", "t1",
WorkUnitState.WorkingState.RUNNING, false);
+
+ SettableJobProgress jobProgress = new
SettableJobProgress("job123", 10, 0, 0, JobState.RunningState.RUNNING,
+ Lists.newArrayList(t1, t2));
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+
+ JobInterruptionPredicate predicate =
+ new JobInterruptionPredicate(jobProgress,
+ "SELECT EXISTS(SELECT * FROM
(SELECT completedTasks > 5 AS pred FROM jobProgress UNION SELECT count(*) > 0
AS pred FROM taskProgress WHERE workingState = 'FAILED') WHERE pred)",
+ () -> atomicBoolean.set(true),
false);
+
+ predicate.runOneIteration();
+ Assert.assertFalse(atomicBoolean.get());
+
+ t2.workingState = WorkUnitState.WorkingState.FAILED;
+
+ predicate.runOneIteration();
+ Assert.assertTrue(atomicBoolean.get());
+ atomicBoolean.set(false);
+
+ t2.workingState = WorkUnitState.WorkingState.RUNNING;
+
+ predicate.runOneIteration();
+ Assert.assertFalse(atomicBoolean.get());
+
+ jobProgress.completedTasks = 6;
+
+ predicate.runOneIteration();
+ Assert.assertTrue(atomicBoolean.get());
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public static class SettableJobProgress implements JobProgress {
+ private final String jobId;
+ private int taskCount;
+ private int completedTasks;
+ private long elapsedTime;
+ private JobState.RunningState runningState;
+ private List<TaskProgress> taskProgress;
+
+ @Override
+ public JobState.RunningState getState() {
+ return this.runningState;
+ }
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public static class SettableTaskProgress implements TaskProgress {
+ private final String jobId;
+ private final String taskId;
+ private WorkUnitState.WorkingState workingState;
+ private boolean isCompleted;
+ }
+
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
new file mode 100644
index 0000000..ba7dc4b
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import lombok.Data;
+
+public class ReflectivePredicateEvaluatorTest {
+
+ @Test
+ public void simpleTest() throws Exception {
+ ReflectivePredicateEvaluator evaluator = new
ReflectivePredicateEvaluator(
+ "SELECT anInt = 1 FROM myInterface",
MyInterface.class);
+
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1,
"foo")));
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(2,
"foo")));
+
+ Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR
aString = 'foo' FROM myInterface",
+ new MyImplementation(1, "bar")));
+ Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR
aString = 'foo' FROM myInterface",
+ new MyImplementation(2, "foo")));
+ Assert.assertFalse(evaluator.evaluate("SELECT anInt = 1 OR
aString = 'foo' FROM myInterface",
+ new MyImplementation(2, "bar")));
+ }
+
+ @Test
+ public void testWithAggregations() throws Exception {
+ ReflectivePredicateEvaluator evaluator = new
ReflectivePredicateEvaluator(
+ "SELECT sum(anInt) = 5 FROM myInterface",
MyInterface.class);
+
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(1,
"foo")));
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1,
"foo"), new MyImplementation(4, "foo")));
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(2,
"foo"), new MyImplementation(4, "foo")));
+ }
+
+ @Test
+ public void testWithAggregationsAndFilter() throws Exception {
+ ReflectivePredicateEvaluator evaluator = new
ReflectivePredicateEvaluator(
+ "SELECT sum(anInt) = 5 FROM myInterface WHERE
aString = 'foo'", MyInterface.class);
+
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(1,
"foo")));
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1,
"foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "bar")));
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(1,
"foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "foo")));
+ }
+
+ @Test
+ public void testMultipleInterfaces() throws Exception {
+ ReflectivePredicateEvaluator evaluator = new
ReflectivePredicateEvaluator(
+ "SELECT true = ALL (SELECT sum(anInt) = 2 AS
satisfied FROM myInterface UNION SELECT sum(anInt) = 3 AS satisfied FROM
myInterface2)",
+ MyInterface.class, MyInterface2.class);
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(2,
"foo")));
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(2,
"foo"), new MyImplementation2(3)));
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1,
"foo"), new MyImplementation2(3), new MyImplementation(1, "foo")));
+ }
+
+ @Test
+ public void testMultipleOutputs() throws Exception {
+ ReflectivePredicateEvaluator evaluator =
+ new ReflectivePredicateEvaluator("SELECT anInt
= 1 FROM myInterface", MyInterface.class);
+ Assert.assertTrue(evaluator.evaluate(new MyImplementation(1,
"bar"), new MyImplementation(1, "foo")));
+ Assert.assertFalse(evaluator.evaluate(new MyImplementation(1,
"bar"), new MyImplementation(2, "foo")));
+ }
+
+ @Test
+ public void testInvalidSQL() throws Exception {
+ try {
+ ReflectivePredicateEvaluator evaluator =
+ new
ReflectivePredicateEvaluator("SELECT anInt FROM myInterface",
MyInterface.class);
+ Assert.fail();
+ } catch (IllegalArgumentException exc) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testNoOutputs() throws Exception {
+ try {
+ ReflectivePredicateEvaluator evaluator =
+ new
ReflectivePredicateEvaluator("SELECT anInt = 1 FROM myInterface WHERE aString =
'foo'",
+ MyInterface.class);
+ evaluator.evaluate(new MyImplementation(1, "bar"));
+ Assert.fail();
+ } catch (IllegalArgumentException exc) {
+ // Expected
+ }
+ }
+
+ private interface MyInterface {
+ int getAnInt();
+ String getAString();
+ }
+
+ @Data
+ private static class MyImplementation implements MyInterface {
+ private final int anInt;
+ private final String aString;
+ }
+
+ private interface MyInterface2 {
+ int getAnInt();
+ }
+
+ @Data
+ private static class MyImplementation2 implements MyInterface2 {
+ private final int anInt;
+ }
+
+}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
new file mode 100644
index 0000000..20a2bf8
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of a basic FiniteStateMachine that allows keeping track
of the state its state and gating certain
+ * logic on whether a transition is valid or not.
+ *
+ * This class is useful in situations where logic is complex, possibly
multi-threaded, and can take multiple paths. Certain
+ * pieces of logic (for example running a job, publishing a dataset, etc) can
only happen if other actions ended correctly,
+ * and the FSM is a way of simplifying the encoding and verification of those
conditions. It is understood that state
+ * transitions may not be instantaneous, and that other state transitions
should not start until the current one has
+ * been resolved.
+ *
+ * All public methods of this class will wait until the FSM is in a
non-transitioning state. If multiple transitions are
+ * queued at the same time, the order in which they are executed is
essentially random.
+ *
+ * The states supported by FSM can be enums or instances of any base type. The
legality of a transition is determined
+ * by equality, i.e. if a transition A -> B is legal, the current state is A'
and the desired end state is B', the transition
+ * will be legal if A.equals(A') && B.equals(B'). This allows for storing
additional information into the current state
+ * as long as it does not affect the equality check (i.e. fields that are not
compared in the equals check can store
+ * state metadata, etc.).
+ *
+ * Suggested Usage:
+ * FiniteStateMachine<MySymbols> fsm = new
FiniteStateMachine.Builder().addTransition(START_SYMBOL,
END_SYMBOL).build(initialSymbol);
+ *
+ * try (Transition transition = fsm.startTransition(MY_END_STATE)) {
+ * try {
+ * // my logic
+ * } catch (MyException exc) {
+ * transition.changeEndState(MY_ERROR);
+ * }
+ * } catch (UnallowedTransitionException exc) {
+ * // Cannot execute logic because it's an illegal transition!
+ * } catch (ReentrantStableStateWait exc) {
+ * // Somewhere in the logic an instruction tried to do an operation with
the fsm that would likely cause a deadlock
+ * } catch (AbandonedTransitionException exc) {
+ * // Another thread initiated a transition and became inactive ending the
transition
+ * } catch (InterruptedException exc) {
+ * // Could not start transition because thread got interrupted while
waiting for a non-transitioning state
+ * } catch (FailedTransitionCallbackException exc) {
+ * // A callback in the transition start or end states has failed.
+ * exc.getTransition().changeEndState(MY_ERROR).closeWithoutCallbacks(); //
example handling
+ * }
+ *
+ * @param <T>
+ */
+@Slf4j
+public class FiniteStateMachine<T> {
+
+ /**
+ * Used to build a {@link FiniteStateMachine} instance.
+ */
+ public static class Builder<T> {
+ private final SetMultimap<T, T> allowedTransitions;
+ private final Set<T> universalEnds;
+ private T errorState;
+
+ public Builder() {
+ this.allowedTransitions = HashMultimap.create();
+ this.universalEnds = new HashSet<>();
+ }
+
+ /**
+ * Add a legal transition to the {@link FiniteStateMachine}.
+ */
+ public Builder<T> addTransition(T startState, T endState) {
+ this.allowedTransitions.put(startState, endState);
+ return this;
+ }
+
+ /**
+ * Specify that a state is a valid end state for a transition
starting from any state. Useful for example for
+ * error states.
+ */
+ public Builder<T> addUniversalEnd(T state) {
+ this.universalEnds.add(state);
+ return this;
+ }
+
+ /**
+ * Specify the error state to which this machine can transition if nothing
else is possible. Note the error state
+ * is always an allowed end state.
+ */
+ public Builder<T> errorState(T state) {
+ this.errorState = state;
+ return this;
+ }
+
+ /**
+ * Build a {@link FiniteStateMachine} starting at the given
initial state.
+ */
+ public FiniteStateMachine<T> build(T initialState) {
+ return new
FiniteStateMachine<>(this.allowedTransitions, this.universalEnds,
this.errorState, initialState);
+ }
+ }
+
+ private final SetMultimap<T, T> allowedTransitions;
+ private final Set<T> universalEnds;
+ private final T errorState;
+
+ private final ReentrantReadWriteLock lock;
+ private final Condition condition;
+ private final T initialState;
+
+ private volatile T currentState;
+ private volatile Transition currentTransition;
+
+ protected FiniteStateMachine(SetMultimap<T, T> allowedTransitions,
Set<T> universalEnds, T errorState, T initialState) {
+ this.allowedTransitions = allowedTransitions;
+ this.universalEnds = universalEnds;
+ this.errorState = errorState;
+
+ this.lock = new ReentrantReadWriteLock();
+ this.condition = this.lock.writeLock().newCondition();
+ this.initialState = initialState;
+ this.currentState = initialState;
+
+ if (this.currentState instanceof StateWithCallbacks) {
+ ((StateWithCallbacks) this.currentState).onEnterState(null);
+ }
+ }
+
+ /**
+ * Start a transition to the end state specified. The returned {@link
Transition} object is a closeable that will finalize
+ * the transition when it is closed. While the transition is open, no
other transition can start.
+ *
+ * It is recommended to call this method only within a
try-with-resource block to ensure the transition is closed.
+ *
+ * @throws UnallowedTransitionException If the transition is not
allowed.
+ * @throws InterruptedException if the thread got interrupted while
waiting for a non-transitioning state.
+ */
+ public Transition startTransition(T endState) throws
UnallowedTransitionException, InterruptedException {
+ try {
+ this.lock.writeLock().lock();
+ while (isTransitioning()) {
+ this.condition.await();
+ }
+ if (!isAllowedTransition(this.currentState, endState)) {
+ throw new
UnallowedTransitionException(this.currentState, endState);
+ }
+ Transition transition = new Transition(endState);
+ this.currentTransition = transition;
+ return transition;
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Transition immediately to the given end state. This is essentially
{@link #startTransition(Object)} immediately
+ * followed by {@link Transition#close()}.
+ *
+ * @throws UnallowedTransitionException if the transition is not
allowed.
+ * @throws InterruptedException if the thread got interrupted while
waiting for a non-transitioning state.
+ */
+ public void transitionImmediately(T endState) throws
UnallowedTransitionException, InterruptedException,
FailedTransitionCallbackException {
+ Transition transition = startTransition(endState);
+ transition.close();
+ }
+
+ /**
+ * Transition immediately to the given end state if the transition is
allowed.
+ *
+ * @return true if the transition happened.
+ * @throws InterruptedException if the thread got interrupted while
waiting for a non-transitioning state.
+ */
+ public boolean transitionIfAllowed(T endState) throws
InterruptedException, FailedTransitionCallbackException {
+ try {
+ transitionImmediately(endState);
+ } catch (UnallowedTransitionException exc) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Get the current state. This method will wait until the FSM is in a
non-transitioning state (although a transition
+ * may start immediately after).
+ * @throws InterruptedException if the thread got interrupted while
waiting for a non-transitioning state.
+ */
+ public T getCurrentState() throws InterruptedException {
+ try {
+ // Need to get lock to make sure we're not in transitioning
state.
+ this.lock.readLock().lock();
+
+ waitForNonTransitioningReadLock();
+
+ return this.currentState;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ @VisibleForTesting
+ T getCurrentStateEvenIfTransitioning() {
+ return this.currentState;
+ }
+
+ /**
+ * @return A clone of this FSM starting at the initial state of the FSM.
+ */
+ public FiniteStateMachine<T> cloneAtInitialState() {
+ return new FiniteStateMachine<>(this.allowedTransitions,
this.universalEnds, this.errorState, this.initialState);
+ }
+
+ /**
+ * @return A clone of this FSM starting at the current state of the FSM.
+ */
+ public FiniteStateMachine<T> cloneAtCurrentState() throws
InterruptedException {
+ try {
+ this.lock.readLock().lock();
+
+ waitForNonTransitioningReadLock();
+
+ return new
FiniteStateMachine<>(this.allowedTransitions, this.universalEnds,
this.errorState, this.currentState);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Waits for a read lock in a non-transitioning state. The caller MUST
hold the read lock before calling this method.
+ * @throws InterruptedException
+ */
+ private void waitForNonTransitioningReadLock() throws
InterruptedException {
+ if (isTransitioning()) {
+ this.lock.readLock().unlock();
+ // To use the condition, need to upgrade to a write lock
+ this.lock.writeLock().lock();
+ try {
+ while (isTransitioning()) {
+ this.condition.await();
+ }
+ // After non-transitioning state, downgrade
again to read-lock
+ this.lock.readLock().lock();
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+ }
+
+ private boolean isTransitioning() {
+ if (this.currentTransition != null &&
Thread.currentThread().equals(this.currentTransition.ownerThread)) {
+ throw new ReentrantStableStateWait(
+ "Tried to check for non-transitioning
state from a thread that had already initiated a transition, "
+ + "this may indicate a
deadlock. To change end state use Transition.changeEndState() instead.");
+ }
+ if (this.currentTransition != null &&
!this.currentTransition.ownerThread.isAlive()) {
+ throw new
AbandonedTransitionException(this.currentTransition.ownerThread);
+ }
+ return this.currentTransition != null;
+ }
+
+ protected boolean isAllowedTransition(T startState, T endState) {
+ if (endState.equals(this.errorState)) {
+ return true;
+ }
+ if (this.universalEnds.contains(endState)) {
+ return true;
+ }
+ Set<T> endStates = this.allowedTransitions.get(startState);
+ return endStates != null && endStates.contains(endState);
+ }
+
+ /**
+ * A handle used for controlling the transition of the {@link
FiniteStateMachine}. Note if this handle is lost the
+ * {@link FiniteStateMachine} will likely go into an invalid state.
+ */
+ public class Transition implements Closeable {
+ private final Thread ownerThread;
+ private volatile T endState;
+ private volatile boolean closed;
+
+ private Transition(T endState) {
+ this.ownerThread = Thread.currentThread();
+ this.endState = endState;
+ this.closed = false;
+ }
+
+ /**
+ * Get the state at the beginning of this transition.
+ */
+ public T getStartState() {
+ if (this.closed) {
+ throw new IllegalStateException("Transition
already closed.");
+ }
+ return FiniteStateMachine.this.currentState;
+ }
+
+ /**
+ * Change the end state of the transition. The new end state
must be a legal transition for the state when the
+ * {@link Transition} was created.
+ *
+ * @throws UnallowedTransitionException if the new end state is
not an allowed transition.
+ */
+ public synchronized void changeEndState(T endState) throws
UnallowedTransitionException {
+ if (this.closed) {
+ throw new IllegalStateException("Transition
already closed.");
+ }
+ if
(!isAllowedTransition(FiniteStateMachine.this.currentState, endState)) {
+ throw new
UnallowedTransitionException(FiniteStateMachine.this.currentState, endState);
+ }
+ this.endState = endState;
+ }
+
+ /**
+ * Change the end state of the transition to the FSM error state.
+ */
+ public synchronized void switchEndStateToErrorState() {
+ this.endState = FiniteStateMachine.this.errorState;
+ }
+
+ /**
+ * Close the current transition moving the {@link
FiniteStateMachine} to the end state and releasing all locks.
+ *
+ * @throws FailedTransitionCallbackException when start or end state
callbacks fail. Note if this exception is thrown
+ * the transition is not complete and the error must be handled to
complete it.
+ */
+ @Override
+ public void close() throws FailedTransitionCallbackException {
+ doClose(true);
+ }
+
+ /**
+ * Close the current transition moving the {@link FiniteStateMachine} to
the end state and releasing all locks without
+ * calling any callbacks. This method should only be called after a {@link
#close()} has failed and the failure
+ * cannot be handled.
+ */
+ public void closeWithoutCallbacks() {
+ try {
+ doClose(false);
+ } catch (FailedTransitionCallbackException exc) {
+ throw new IllegalStateException(String.format("Close
without callbacks threw a %s. This is an error in code.",
+ FailedTransitionCallbackException.class), exc);
+ }
+ }
+
+ private synchronized void doClose(boolean withCallbacks) throws
FailedTransitionCallbackException {
+ if (this.closed) {
+ return;
+ }
+
+ try {
+ FiniteStateMachine.this.lock.writeLock().lock();
+
+ try {
+ if (withCallbacks && getStartState() instanceof StateWithCallbacks) {
+ ((StateWithCallbacks<T>)
getStartState()).onLeaveState(this.endState);
+ }
+ } catch (Throwable t) {
+ throw new FailedTransitionCallbackException(this,
FailedCallback.START_STATE, t);
+ }
+
+ try {
+ if (withCallbacks && this.endState instanceof StateWithCallbacks) {
+ ((StateWithCallbacks) this.endState).onEnterState(getStartState());
+ }
+ } catch (Throwable t) {
+ throw new FailedTransitionCallbackException(this,
FailedCallback.END_STATE, t);
+ }
+
+ this.closed = true;
+
+ FiniteStateMachine.this.currentState = this.endState;
+ FiniteStateMachine.this.currentTransition = null;
+ FiniteStateMachine.this.condition.signalAll();
+ } finally {
+ FiniteStateMachine.this.lock.writeLock().unlock();
+ }
+ }
+ }
+
+ /**
+ * If a transition is not allowed to happen.
+ */
+ @Getter
+ public static class UnallowedTransitionException extends Exception {
+ private final Object startState;
+ private final Object endState;
+
+ public UnallowedTransitionException(Object startState, Object
endState) {
+ super(String.format("Unallowed transition: %s -> %s",
startState, endState));
+ this.startState = startState;
+ this.endState = endState;
+ }
+ }
+
+ /**
+ * Thrown when a thread that has started a transition is waiting for a
non-transitioning state, which is a deadlock situation.
+ */
+ public static class ReentrantStableStateWait extends RuntimeException {
+ public ReentrantStableStateWait(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Thrown when a transition was initiated by a thread that no longer
exists, likely implying that the transition can
+ * never be closed.
+ */
+ public static class AbandonedTransitionException extends
RuntimeException {
+ private final Thread startingThread;
+
+ public AbandonedTransitionException(Thread startingThread) {
+ super(String.format("Thread %s initiated a transition
but became inactive before closing it.", startingThread));
+ this.startingThread = startingThread;
+ }
+ }
+
+ public enum FailedCallback {
+ START_STATE, END_STATE
+ }
+
+ /**
+ * Thrown when the callbacks when closing a transition fail.
+ */
+ @Getter
+ public static class FailedTransitionCallbackException extends
IOException {
+ private final FiniteStateMachine.Transition transition;
+ private final FailedCallback failedCallback;
+ private final Throwable originalException;
+
+ public FailedTransitionCallbackException(FiniteStateMachine<?>.Transition
transition, FailedCallback failedCallback,
+ Throwable originalException) {
+ super("Failed callbacks when ending transition.", originalException);
+ this.transition = transition;
+ this.failedCallback = failedCallback;
+ this.originalException = originalException;
+ }
+ }
+}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
new file mode 100644
index 0000000..9a77a7d
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/fsm/StateWithCallbacks.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * A state for a {@link FiniteStateMachine} which supports callbacks when
entering and leaving the state.
+ * @param <T> supertype of states in the FSM.
+ */
+public interface StateWithCallbacks<T> {
+
+ /**
+ * Called when an FSM reaches this state.
+ * @param previousState the previous state of the machine.
+ */
+ default void onEnterState(@Nullable T previousState) {
+ // do nothing
+ }
+
+ /**
+ * Called when an FSM leaves this state.
+ * @param nextState the next state of the machine.
+ */
+ default void onLeaveState(T nextState) {
+ // do nothing
+ }
+
+}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
new file mode 100644
index 0000000..ec50d02
--- /dev/null
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/fsm/FiniteStateMachineTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Sets;
+
+import javax.annotation.Nullable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class FiniteStateMachineTest {
+
+ public enum MyStates {
+ PENDING, RUNNING, SUCCESS, ERROR
+ }
+
+ private final FiniteStateMachine<MyStates> refFsm = new
FiniteStateMachine.Builder<MyStates>()
+ .addTransition(MyStates.PENDING, MyStates.RUNNING)
+ .addTransition(MyStates.RUNNING, MyStates.SUCCESS)
+ .addTransition(MyStates.PENDING, MyStates.ERROR)
+ .addTransition(MyStates.RUNNING,
MyStates.ERROR).build(MyStates.PENDING);
+
+ @Test
+ public void singleThreadImmediateTransitionsTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+ fsm.transitionImmediately(MyStates.RUNNING);
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+ fsm.transitionImmediately(MyStates.SUCCESS);
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.SUCCESS);
+
+ fsm = fsm.cloneAtInitialState();
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+ fsm.transitionImmediately(MyStates.ERROR);
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+
+ fsm = fsm.cloneAtCurrentState();
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+ }
+
+ @Test
+ public void illegalTransitionsTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+ try {
+ fsm.transitionImmediately(MyStates.PENDING);
+ Assert.fail();
+ } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+ // expected
+ }
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+ try {
+ fsm.transitionImmediately(MyStates.SUCCESS);
+ Assert.fail();
+ } catch (FiniteStateMachine.UnallowedTransitionException exc) {
+ // expected
+ }
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+ fsm.transitionImmediately(MyStates.RUNNING);
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+ }
+
+ @Test
+ public void slowTransitionsTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+ try (FiniteStateMachine.Transition transition =
fsm.startTransition(MyStates.RUNNING)) {
+ try {
+ fsm.getCurrentState();
+ Assert.fail();
+ } catch (FiniteStateMachine.ReentrantStableStateWait
exc) {
+ // Expected because the same thread that is
transitioning tries to read the current state
+ }
+ try {
+ fsm.transitionImmediately(MyStates.RUNNING);
+ Assert.fail();
+ } catch (FiniteStateMachine.ReentrantStableStateWait
exc) {
+ // Expected because the same thread that is
transitioning tries to start another transition
+ }
+ }
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.RUNNING);
+
+ try (FiniteStateMachine<MyStates>.Transition transition =
fsm.startTransition(MyStates.SUCCESS)) {
+ transition.changeEndState(MyStates.ERROR);
+ }
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.ERROR);
+ }
+
+ @Test
+ public void callbackTest() throws Exception {
+ NamedStateWithCallback stateA = new NamedStateWithCallback("a");
+ NamedStateWithCallback stateB = new NamedStateWithCallback("b");
+ NamedStateWithCallback stateC = new NamedStateWithCallback("c", null, s ->
{
+ throw new RuntimeException("leave");
+ });
+ NamedStateWithCallback stateD = new NamedStateWithCallback("d");
+
+ FiniteStateMachine<NamedStateWithCallback> fsm = new
FiniteStateMachine.Builder<NamedStateWithCallback>()
+ .addTransition(new NamedStateWithCallback("a"), new
NamedStateWithCallback("b"))
+ .addTransition(new NamedStateWithCallback("b"), new
NamedStateWithCallback("c"))
+ .addTransition(new NamedStateWithCallback("c"), new
NamedStateWithCallback("d"))
+ .addUniversalEnd(new NamedStateWithCallback("ERROR"))
+ .build(stateA);
+
+ fsm.transitionImmediately(stateB);
+
+ Assert.assertEquals(fsm.getCurrentState(), stateB);
+ Assert.assertEquals(stateA.lastTransition, "leave:a->b");
+ stateA.lastTransition = "";
+ Assert.assertEquals(stateB.lastTransition, "enter:a->b");
+ stateB.lastTransition = "";
+
+ try {
+ // State that will error on enter
+ fsm.transitionImmediately(new NamedStateWithCallback("c", s -> {
+ throw new RuntimeException("enter");
+ }, s -> {
+ throw new RuntimeException("leave");
+ }));
+ Assert.fail("Expected excpetion");
+ } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+ Assert.assertEquals(exc.getFailedCallback(),
FiniteStateMachine.FailedCallback.END_STATE);
+ Assert.assertEquals(exc.getOriginalException().getMessage(), "enter");
+ // switch state to one that will only error on leave
+ exc.getTransition().changeEndState(stateC);
+ exc.getTransition().close();
+ }
+
+ Assert.assertEquals(fsm.getCurrentState(), stateC);
+ Assert.assertEquals(stateB.lastTransition, "leave:b->c");
+ stateB.lastTransition = "";
+ Assert.assertEquals(stateC.lastTransition, "enter:b->c");
+ stateC.lastTransition = "";
+
+ try {
+ fsm.transitionImmediately(stateD);
+ Assert.fail("Expected exception");
+ } catch (FiniteStateMachine.FailedTransitionCallbackException exc) {
+ Assert.assertEquals(exc.getFailedCallback(),
FiniteStateMachine.FailedCallback.START_STATE);
+ Assert.assertEquals(exc.getOriginalException().getMessage(), "leave");
+ // switch state to one that will only error on leave
+ exc.getTransition().changeEndState(new NamedStateWithCallback("ERROR"));
+ exc.getTransition().closeWithoutCallbacks();
+ }
+
+ Assert.assertEquals(fsm.getCurrentState(), new
NamedStateWithCallback("ERROR"));
+ Assert.assertEquals(stateD.lastTransition, "");
+ }
+
+ @Test(timeOut = 5000)
+ public void multiThreadTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+ Transitioner<MyStates> t1 = new Transitioner<>(fsm,
MyStates.RUNNING);
+ Transitioner<MyStates> t2 = new Transitioner<>(fsm,
MyStates.ERROR);
+
+ Thread t1Thread = new Thread(null, t1, "t1");
+ t1Thread.start();
+ t1.awaitState(Sets.newHashSet(TransitionState.TRANSITIONING));
+ Assert.assertEquals(t1.transitionResult,
TransitionState.TRANSITIONING);
+ Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(),
MyStates.PENDING);
+
+ Thread t2Thread = new Thread(null, t2, "t2");
+ t2Thread.start();
+ Assert.assertEquals(t1.transitionResult,
TransitionState.TRANSITIONING);
+ Assert.assertEquals(t2.transitionResult,
TransitionState.STARTING);
+ Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(),
MyStates.PENDING);
+
+ t1Thread.interrupt();
+ t1.awaitState(Sets.newHashSet(TransitionState.COMPLETED));
+ t2.awaitState(Sets.newHashSet(TransitionState.TRANSITIONING));
+ Assert.assertEquals(t1.transitionResult,
TransitionState.COMPLETED);
+ Assert.assertEquals(t2.transitionResult,
TransitionState.TRANSITIONING);
+ Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(),
MyStates.RUNNING);
+
+ t2Thread.interrupt();
+ t2.awaitState(Sets.newHashSet(TransitionState.COMPLETED));
+ Assert.assertEquals(t1.transitionResult,
TransitionState.COMPLETED);
+ Assert.assertEquals(t2.transitionResult,
TransitionState.COMPLETED);
+ Assert.assertEquals(fsm.getCurrentStateEvenIfTransitioning(),
MyStates.ERROR);
+ }
+
+ @Test(timeOut = 5000)
+ public void deadThreadTest() throws Exception {
+ FiniteStateMachine<MyStates> fsm = refFsm.cloneAtInitialState();
+
+ Assert.assertEquals(fsm.getCurrentState(), MyStates.PENDING);
+
+ Thread t = new Thread(() -> {
+ try {
+ FiniteStateMachine.Transition transition =
fsm.startTransition(MyStates.RUNNING);
+ } catch
(FiniteStateMachine.UnallowedTransitionException | InterruptedException exc) {
+ // do nothing
+ }
+ // since we don't close the transition, it should
become orphaned
+ });
+ t.start();
+
+ while (t.isAlive()) {
+ Thread.sleep(50);
+ }
+
+ try {
+ fsm.transitionImmediately(MyStates.RUNNING);
+ Assert.fail();
+ } catch (FiniteStateMachine.AbandonedTransitionException exc) {
+ // Expected
+ }
+ }
+
+ @Data
+ private class Transitioner<T> implements Runnable {
+ private final FiniteStateMachine<T> fsm;
+ private final T endState;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
+ private volatile boolean running = false;
+ private volatile TransitionState transitionResult =
TransitionState.STARTING;
+
+ @Override
+ public void run() {
+ try(FiniteStateMachine.Transition transition =
this.fsm.startTransition(this.endState)) {
+ goToState(TransitionState.TRANSITIONING);
+ try {
+ Thread.sleep(2000);
+ this.transitionResult =
TransitionState.TIMEOUT;
+ return;
+ } catch (InterruptedException ie) {
+ // This is the signal to end the state
transition, so do nothing
+ }
+ } catch (InterruptedException exc) {
+ goToState(TransitionState.INTERRUPTED);
+ return;
+ } catch
(FiniteStateMachine.UnallowedTransitionException exc) {
+ goToState(TransitionState.UNALLOWED);
+ return;
+ } catch
(FiniteStateMachine.FailedTransitionCallbackException exc) {
+ goToState(TransitionState.CALLBACK_ERROR);
+ return;
+ }
+ goToState(TransitionState.COMPLETED);
+ }
+
+ public void awaitState(Set<TransitionState> states) throws
InterruptedException {
+ try {
+ this.lock.lock();
+ while (!states.contains(this.transitionResult))
{
+ this.condition.await();
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void goToState(TransitionState state) {
+ try {
+ this.lock.lock();
+ this.transitionResult = state;
+ this.condition.signalAll();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+ }
+
+ enum TransitionState {
+ STARTING, TRANSITIONING, COMPLETED, INTERRUPTED, UNALLOWED,
TIMEOUT, CALLBACK_ERROR
+ }
+
+ @RequiredArgsConstructor
+ @EqualsAndHashCode(of = "name")
+ public static class NamedStateWithCallback implements
StateWithCallbacks<NamedStateWithCallback> {
+ @Getter
+ private final String name;
+ private final Function<NamedStateWithCallback, Void> enterCallback;
+ private final Function<NamedStateWithCallback, Void> leaveCallback;
+
+ String lastTransition = "";
+
+ public NamedStateWithCallback(String name) {
+ this(name, null, null);
+ }
+
+ private void setLastTransition(String callback, NamedStateWithCallback
start, NamedStateWithCallback end) {
+ this.lastTransition = String.format("%s:%s->%s", callback, start == null
? "null" : start.name, end.name);
+ }
+
+ @Override
+ public void onEnterState(@Nullable NamedStateWithCallback previousState) {
+ if (this.enterCallback == null) {
+ setLastTransition("enter", previousState, this);
+ } else {
+ this.enterCallback.apply(previousState);
+ }
+ }
+
+ @Override
+ public void onLeaveState(NamedStateWithCallback nextState) {
+ if (this.leaveCallback == null) {
+ setLastTransition("leave", this, nextState);
+ } else {
+ this.leaveCallback.apply(nextState);
+ }
+ }
+ }
+}
diff --git a/gradle/scripts/defaultBuildProperties.gradle
b/gradle/scripts/defaultBuildProperties.gradle
index 8509cc6..e772acd 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -25,7 +25,7 @@ def BuildProperties BUILD_PROPERTIES = new
BuildProperties(project)
.register(new BuildProperty("nexusArtifactSnapshotRepository",
"https://repository.apache.org/content/repositories/snapshots", "Maven
repository to publish artifacts"))
.register(new BuildProperty("avroVersion", "1.8.1", "Avro dependencies
version"))
.register(new BuildProperty("awsVersion", "1.11.8", "AWS dependencies
version"))
- .register(new BuildProperty("bytemanVersion", "2.2.1", "Byteman
dependencies version"))
+ .register(new BuildProperty("bytemanVersion", "4.0.5", "Byteman
dependencies version"))
.register(new BuildProperty("confluentVersion", "2.0.1", "confluent
dependencies version"))
.register(new BuildProperty("doNotSignArtifacts", false, "Do not sight
Maven artifacts"))
.register(new BuildProperty("gobblinFlavor", "standard", "Build flavor
(see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index f7ff29c..0009397 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -121,8 +121,8 @@ ext.externalDependency = [
"bytemanBmunit": "org.jboss.byteman:byteman-bmunit:" + bytemanVersion,
"bcpgJdk15on": "org.bouncycastle:bcpg-jdk15on:1.52",
"bcprovJdk15on": "org.bouncycastle:bcprov-jdk15on:1.52",
- "calciteCore": "org.apache.calcite:calcite-core:1.3.0-incubating",
- "calciteAvatica": "org.apache.calcite:calcite-avatica:1.3.0-incubating",
+ "calciteCore": "org.apache.calcite:calcite-core:1.16.0",
+ "calciteAvatica": "org.apache.calcite:calcite-avatica:1.13.0",
"jhyde": "org.pentaho:pentaho-aggdesigner-algorithm:5.1.5-jhyde",
"curatorFramework": "org.apache.curator:curator-framework:2.10.0",
"curatorRecipes": "org.apache.curator:curator-recipes:2.10.0",
diff --git a/gradle/scripts/globalDependencies.gradle
b/gradle/scripts/globalDependencies.gradle
index d64db67..270a121 100644
--- a/gradle/scripts/globalDependencies.gradle
+++ b/gradle/scripts/globalDependencies.gradle
@@ -43,6 +43,7 @@ subprojects {
// Required to add JDK's tool jar, which is required to run byteman
tests.
testCompile (files(((URLClassLoader)
ToolProvider.getSystemToolClassLoader()).getURLs()))
}
+ all*.exclude group: 'org.apache.calcite', module: 'calcite-avatica' //
replaced by org.apache.calcite.avatica:avatica-core
}
}
}