[FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator aware of the root state backend
This closes #3411 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3446e66a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3446e66a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3446e66a Branch: refs/heads/master Commit: 3446e66aac63a3dfdaf8cfd4a73bd80a7f038379 Parents: 5b7f21d Author: Stephan Ewen <[email protected]> Authored: Fri Feb 17 17:51:00 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Feb 28 19:02:13 2017 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBStateBackend.java | 12 +- .../state/RocksDBStateBackendFactory.java | 19 +- .../jobmanager/JMXJobManagerMetricTest.java | 2 +- .../CheckpointConfigHandlerTest.java | 3 + .../checkpoint/CheckpointCoordinator.java | 21 ++- .../runtime/executiongraph/ExecutionGraph.java | 4 +- .../executiongraph/ExecutionGraphBuilder.java | 32 +++- .../jobgraph/tasks/JobSnapshottingSettings.java | 15 +- .../runtime/state/AbstractStateBackend.java | 173 ++++++++++++++++++- .../runtime/state/StateBackendFactory.java | 16 +- .../state/filesystem/FsStateBackend.java | 31 +++- .../state/filesystem/FsStateBackendFactory.java | 22 +-- .../flink/runtime/state/heap/package-info.java | 23 +++ .../runtime/state/internal/package-info.java | 52 ++++++ .../state/memory/MemoryStateBackend.java | 2 +- .../checkpoint/CheckpointStatsTrackerTest.java | 1 + .../checkpoint/CoordinatorShutdownTest.java | 5 +- ...ExecutionGraphCheckpointCoordinatorTest.java | 3 +- .../ArchivedExecutionGraphTest.java | 3 +- .../tasks/JobSnapshottingSettingsTest.java | 6 + .../jobmanager/JobManagerHARecoveryTest.java | 1 + .../runtime/jobmanager/JobManagerTest.java | 5 + .../flink/runtime/jobmanager/JobSubmitTest.java | 2 +- .../runtime/state/StateBackendLoadingTest.java | 164 ++++++++++++++++++ .../runtime/jobmanager/JobManagerITCase.scala | 3 + .../api/graph/StreamGraphGenerator.java | 2 +- .../api/graph/StreamingJobGraphGenerator.java | 1 + .../streaming/runtime/tasks/StreamTask.java | 72 ++------ .../runtime/tasks/BlockingCheckpointsTest.java | 2 +- .../streaming/runtime/tasks/StreamTaskTest.java | 56 +++--- .../streaming/runtime/StateBackendITCase.java | 2 +- 31 files changed, 609 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 3fd5d0f..dd0e2f7 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -29,10 +29,12 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.util.AbstractID; + import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,7 +162,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { private void lazyInitializeForJob( Environment env, - String operatorIdentifier) throws Exception { + String operatorIdentifier) throws IOException { if (isInitialized) { return; @@ -193,7 +195,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { } if (dirs.isEmpty()) { - throw new Exception("No local storage directories available. " + errorMessage); + throw new IOException("No local storage directories available. " + errorMessage); } else { initializedDbBasePaths = dirs.toArray(new File[dirs.size()]); } @@ -235,7 +237,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry) throws IOException { // first, make sure that the RocksDB JNI library is loaded // we do this explicitly here to have better error handling @@ -437,7 +439,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { // static library loading utilities // ------------------------------------------------------------------------ - private void ensureRocksDBIsLoaded(String tempDirectory) throws Exception { + private void ensureRocksDBIsLoaded(String tempDirectory) throws IOException { synchronized (RocksDBStateBackend.class) { if (!rocksDbInitialized) { @@ -488,7 +490,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { } } - throw new Exception("Could not load the native RocksDB library", lastException); + throw new IOException("Could not load the native RocksDB library", lastException); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java index 5002272..bd9bcaa 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java @@ -15,24 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.contrib.streaming.state; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateBackendFactory; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; /** * A factory that creates an {@link org.apache.flink.contrib.streaming.state.RocksDBStateBackend} * from a configuration. */ -public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBackend> { +public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBStateBackend> { protected static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackendFactory.class); @@ -44,9 +45,11 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBa public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir"; @Override - public AbstractStateBackend createFromConfig(Configuration config) throws Exception { - String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); - String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); + public RocksDBStateBackend createFromConfig(Configuration config) + throws IllegalConfigurationException, IOException { + + final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); + final String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); if (checkpointDirURI == null) { throw new IllegalConfigurationException( @@ -67,8 +70,8 @@ public class RocksDBStateBackendFactory implements StateBackendFactory<FsStateBa return backend; } catch (IllegalArgumentException e) { - throw new Exception("Cannot initialize RocksDB State Backend with URI '" - + checkpointDirURI + '.', e); + throw new IllegalConfigurationException( + "Cannot initialize RocksDB State Backend with URI '" + checkpointDirURI + '.', e); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index b3b7dfc..1fdac65 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -74,7 +74,7 @@ public class JMXJobManagerMetricTest { Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), - 500, 500, 50, 5, ExternalizedCheckpointSettings.none(), true)); + 500, 500, 50, 5, ExternalizedCheckpointSettings.none(), null, true)); flink.waitForActorsToBeAlive(); http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java index e517c3c..95ced0a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java @@ -56,6 +56,7 @@ public class CheckpointConfigHandlerTest { minPause, maxConcurrent, externalized, + null, true); AccessExecutionGraph graph = mock(AccessExecutionGraph.class); @@ -92,6 +93,7 @@ public class CheckpointConfigHandlerTest { 1212L, 12, ExternalizedCheckpointSettings.none(), + null, false); // at least once AccessExecutionGraph graph = mock(AccessExecutionGraph.class); @@ -122,6 +124,7 @@ public class CheckpointConfigHandlerTest { 1212L, 12, externalizedSettings, + null, false); // at least once AccessExecutionGraph graph = mock(AccessExecutionGraph.class); http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6da6f7d..0592e3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -85,6 +85,12 @@ public class CheckpointCoordinator { /** The job whose checkpoint this coordinator coordinates */ private final JobID job; + /** Default checkpoint properties **/ + private final CheckpointProperties checkpointProperties; + + /** The executor used for asynchronous calls, like potentially blocking I/O */ + private final Executor executor; + /** Tasks who need to be sent a message when a checkpoint is started */ private final ExecutionVertex[] tasksToTrigger; @@ -101,7 +107,9 @@ public class CheckpointCoordinator { * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; - /** Default directory for persistent checkpoints; <code>null</code> if none configured. */ + /** Default directory for persistent checkpoints; <code>null</code> if none configured. + * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */ + @Nullable private final String checkpointDirectory; /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */ @@ -154,11 +162,6 @@ public class CheckpointCoordinator { @Nullable private CheckpointStatsTracker statsTracker; - /** Default checkpoint properties **/ - private final CheckpointProperties checkpointProperties; - - private final Executor executor; - // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -173,7 +176,7 @@ public class CheckpointCoordinator { ExecutionVertex[] tasksToCommitTo, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, - String checkpointDirectory, + @Nullable String checkpointDirectory, Executor executor) { // sanity checks @@ -211,6 +214,8 @@ public class CheckpointCoordinator { this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.checkpointDirectory = checkpointDirectory; + this.executor = checkNotNull(executor); + this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.timer = new Timer("Checkpoint Timer", true); @@ -229,8 +234,6 @@ public class CheckpointCoordinator { } catch (Throwable t) { throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t); } - - this.executor = checkNotNull(executor); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index ad4347d..a76a421 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; @@ -348,7 +349,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive return false; } - public void enableSnapshotCheckpointing( + public void enableCheckpointing( long interval, long checkpointTimeout, long minPauseBetweenCheckpoints, @@ -360,6 +361,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, String checkpointDir, + StateBackend metadataStore, CheckpointStatsTracker statsTracker) { // simple sanity checks http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index c558e43..2a79302 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -37,6 +38,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.util.DynamicCodeLoadingException; import org.slf4j.Logger; import javax.annotation.Nullable; @@ -71,8 +75,8 @@ public class ExecutionGraphBuilder { MetricGroup metrics, int parallelismForAutoMax, Logger log) - throws JobExecutionException, JobException - { + throws JobExecutionException, JobException { + checkNotNull(jobGraph, "job graph cannot be null"); final String jobName = jobGraph.getName(); @@ -191,7 +195,28 @@ public class ExecutionGraphBuilder { String externalizedCheckpointsDir = jobManagerConfig.getString( ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null); - executionGraph.enableSnapshotCheckpointing( + // load the state backend for checkpoint metadata. + // if specified in the application, use from there, otherwise load from configuration + final StateBackend metadataBackend; + + final StateBackend applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); + if (applicationConfiguredBackend != null) { + metadataBackend = applicationConfiguredBackend; + + log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.", + applicationConfiguredBackend); + } + else { + try { + metadataBackend = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log); + } + catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) { + throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e); + } + } + + executionGraph.enableCheckpointing( snapshotSettings.getCheckpointInterval(), snapshotSettings.getCheckpointTimeout(), snapshotSettings.getMinPauseBetweenCheckpoints(), @@ -203,6 +228,7 @@ public class ExecutionGraphBuilder { checkpointIdCounter, completedCheckpoints, externalizedCheckpointsDir, + metadataBackend, checkpointStatsTracker); } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java index 561ba89..233aa88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.StateBackend; +import javax.annotation.Nullable; import java.util.List; import static java.util.Objects.requireNonNull; @@ -50,6 +52,10 @@ public class JobSnapshottingSettings implements java.io.Serializable { /** Settings for externalized checkpoints. */ private final ExternalizedCheckpointSettings externalizedCheckpointSettings; + /** The default state backend, if configured by the user in the job */ + @Nullable + private final StateBackend defaultStateBackend; + /** * Flag indicating whether exactly once checkpoint mode has been configured. * If <code>false</code>, at least once mode has been configured. This is @@ -58,7 +64,7 @@ public class JobSnapshottingSettings implements java.io.Serializable { * UI. */ private final boolean isExactlyOnce; - + public JobSnapshottingSettings( List<JobVertexID> verticesToTrigger, List<JobVertexID> verticesToAcknowledge, @@ -68,6 +74,7 @@ public class JobSnapshottingSettings implements java.io.Serializable { long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, ExternalizedCheckpointSettings externalizedCheckpointSettings, + @Nullable StateBackend defaultStateBackend, boolean isExactlyOnce) { // sanity checks @@ -84,6 +91,7 @@ public class JobSnapshottingSettings implements java.io.Serializable { this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings); + this.defaultStateBackend = defaultStateBackend; this.isExactlyOnce = isExactlyOnce; } @@ -121,6 +129,11 @@ public class JobSnapshottingSettings implements java.io.Serializable { return externalizedCheckpointSettings; } + @Nullable + public StateBackend getDefaultStateBackend() { + return defaultStateBackend; + } + public boolean isExactlyOnce() { return isExactlyOnce; } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index a335e45..2cf20a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -21,20 +21,50 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * An abstract base implementation of the {@link StateBackend} interface. + * + * <p> */ @PublicEvolving public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable { private static final long serialVersionUID = 4620415814639230247L; + // ------------------------------------------------------------------------ + // Configuration shortcut names + // ------------------------------------------------------------------------ + + /** The shortcut configuration name for the MemoryState backend that checkpoints to the JobManager */ + public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager"; + + /** The shortcut configuration name for the FileSystem State backend */ + public static final String FS_STATE_BACKEND_NAME = "filesystem"; + + /** The shortcut configuration name for the RocksDB State Backend */ + public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + + // ------------------------------------------------------------------------ + // State Backend - Persisting Byte Storage + // ------------------------------------------------------------------------ + @Override public abstract CheckpointStreamFactory createStreamFactory( JobID jobId, @@ -46,6 +76,10 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri String operatorIdentifier, @Nullable String targetLocation) throws IOException; + // ------------------------------------------------------------------------ + // State Backend - State-Holding Backends + // ------------------------------------------------------------------------ + @Override public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, @@ -54,7 +88,7 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception; + TaskKvStateRegistry kvStateRegistry) throws IOException; @Override public OperatorStateBackend createOperatorStateBackend( @@ -63,4 +97,141 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri return new DefaultOperatorStateBackend(env.getUserClassLoader()); } + + // ------------------------------------------------------------------------ + // Loading the state backend from a configuration + // ------------------------------------------------------------------------ + + /** + * Loads the state backend from the configuration, from the parameter 'state.backend', as defined + * in {@link CoreOptions#STATE_BACKEND}. + * + * <p>The state backends can be specified either via their shortcut name, or via the class name + * of a {@link StateBackendFactory}. If a StateBackendFactory class name is specified, the factory + * is instantiated (via its zero-argument constructor) and its + * {@link StateBackendFactory#createFromConfig(Configuration)} method is called. + * + * <p>Recognized shortcut names are '{@value AbstractStateBackend#MEMORY_STATE_BACKEND_NAME}', + * '{@value AbstractStateBackend#FS_STATE_BACKEND_NAME}', and + * '{@value AbstractStateBackend#ROCKSDB_STATE_BACKEND_NAME}'. + * + * @param config The configuration to load the state backend from + * @param classLoader The class loader that should be used to load the state backend + * @param logger Optionally, a logger to log actions to (may be null) + * + * @return The instantiated state backend. + * + * @throws DynamicCodeLoadingException + * Thrown if a state backend factory is configured and the factory class was not + * found or the factory could not be instantiated + * @throws IllegalConfigurationException + * May be thrown by the StateBackendFactory when creating / configuring the state + * backend in the factory + * @throws IOException + * May be thrown by the StateBackendFactory when instantiating the state backend + */ + public static StateBackend loadStateBackendFromConfig( + Configuration config, + ClassLoader classLoader, + @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { + + checkNotNull(config, "config"); + checkNotNull(classLoader, "classLoader"); + + final String backendName = config.getString(CoreOptions.STATE_BACKEND); + if (backendName == null) { + return null; + } + + // by default the factory class is the backend name + String factoryClassName = backendName; + + switch (backendName.toLowerCase()) { + case MEMORY_STATE_BACKEND_NAME: + if (logger != null) { + logger.info("State backend is set to heap memory (checkpoint to JobManager)"); + } + return new MemoryStateBackend(); + + case FS_STATE_BACKEND_NAME: + FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config); + if (logger != null) { + logger.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")", + fsBackend.getBasePath()); + } + return fsBackend; + + case ROCKSDB_STATE_BACKEND_NAME: + factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory"; + // fall through to the 'default' case that uses reflection to load the backend + // that way we can keep RocksDB in a separate module + + default: + if (logger != null) { + logger.info("Loading state backend via factory {}", factoryClassName); + } + + StateBackendFactory<?> factory; + try { + @SuppressWarnings("rawtypes") + Class<? extends StateBackendFactory> clazz = + Class.forName(factoryClassName, false, classLoader) + .asSubclass(StateBackendFactory.class); + + factory = clazz.newInstance(); + } + catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured state backend factory class: " + backendName, e); + } + catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException("The class configured under '" + + CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" + + backendName + ')', e); + } + + return factory.createFromConfig(config); + } + } + + /** + * Loads the state backend from the configuration, from the parameter 'state.backend', as defined + * in {@link CoreOptions#STATE_BACKEND}. If no state backend is configures, this instantiates the + * default state backend (the {@link MemoryStateBackend}). + * + * <p>Refer to {@link #loadStateBackendFromConfig(Configuration, ClassLoader, Logger)} for details on + * how the state backend is loaded from the configuration. + * + * @param config The configuration to load the state backend from + * @param classLoader The class loader that should be used to load the state backend + * @param logger Optionally, a logger to log actions to (may be null) + * + * @return The instantiated state backend. + * + * @throws DynamicCodeLoadingException + * Thrown if a state backend factory is configured and the factory class was not + * found or the factory could not be instantiated + * @throws IllegalConfigurationException + * May be thrown by the StateBackendFactory when creating / configuring the state + * backend in the factory + * @throws IOException + * May be thrown by the StateBackendFactory when instantiating the state backend + */ + public static StateBackend loadStateBackendFromConfigOrCreateDefault( + Configuration config, + ClassLoader classLoader, + @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { + + final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger); + + if (fromConfig != null) { + return fromConfig; + } + else { + if (logger != null) { + logger.info("No state backend has been configured, using default state backend (Memory / JobManager)"); + } + return new MemoryStateBackend(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java index 39e7ed2..78c976a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java @@ -18,17 +18,24 @@ package org.apache.flink.runtime.state; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import java.io.IOException; import java.io.Serializable; /** * A factory to create a specific state backend. The state backend creation gets a Configuration * object that can be used to read further config values. * + * <p>The state backend factory is typically specified in the configuration to produce a + * configured state backend. + * * @param <T> The type of the state backend created. */ -public interface StateBackendFactory<T extends AbstractStateBackend> extends Serializable { +@PublicEvolving +public interface StateBackendFactory<T extends StateBackend> extends Serializable { /** * Creates the state backend, optionally using the given configuration. @@ -36,7 +43,10 @@ public interface StateBackendFactory<T extends AbstractStateBackend> extends Ser * @param config The Flink configuration (loaded by the TaskManager). * @return The created state backend. * - * @throws Exception Exceptions during instantiation can be forwarded. + * @throws IllegalConfigurationException + * If the configuration misses critical values, or specifies invalid values + * @throws IOException + * If the state backend initialization failed due to an I/O exception */ - AbstractStateBackend createFromConfig(Configuration config) throws Exception; + T createFromConfig(Configuration config) throws IllegalConfigurationException, IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index b614d98..5e8a15d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -36,6 +36,8 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * The file state backend is a state backend that stores the state of streaming jobs in a file system. * @@ -139,17 +141,14 @@ public class FsStateBackend extends AbstractStateBackend { * rather than in files * * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds. */ public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException { - if (fileStateSizeThreshold < 0) { - throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); - } - if (fileStateSizeThreshold > MAX_FILE_STATE_THRESHOLD) { - throw new IllegalArgumentException("The threshold for file state size cannot be larger than " + - MAX_FILE_STATE_THRESHOLD); - } + checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be zero or larger."); + checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, + "The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD); + this.fileStateThreshold = fileStateSizeThreshold; - this.basePath = validateAndNormalizeUri(checkpointDataUri); } @@ -163,6 +162,19 @@ public class FsStateBackend extends AbstractStateBackend { return basePath; } + /** + * Gets the threshold below which state is stored as part of the metadata, rather than in files. + * This threshold ensures that the backend does not create a large amount of very small files, + * where potentially the file pointers are larger than the state itself. + * + * <p>By default, this threshold is {@value #DEFAULT_FILE_STATE_THRESHOLD}. + * + * @return The file size threshold, in bytes. + */ + public int getMinFileSizeThreshold() { + return fileStateThreshold; + } + // ------------------------------------------------------------------------ // initialization and cleanup // ------------------------------------------------------------------------ @@ -189,7 +201,8 @@ public class FsStateBackend extends AbstractStateBackend { TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry) throws IOException { + return new HeapKeyedStateBackend<>( kvStateRegistry, keySerializer, http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java index 042700c..4c933ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java @@ -23,6 +23,8 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackendFactory; +import java.io.IOException; + /** * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} * from a configuration. @@ -35,28 +37,26 @@ public class FsStateBackendFactory implements StateBackendFactory<FsStateBackend /** The key under which the config stores the threshold for state to be store in memory, * rather than in files */ public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold"; - - + + @Override - public FsStateBackend createFromConfig(Configuration config) throws Exception { - String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); - int memoryThreshold = config.getInteger( + public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException { + final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); + final int memoryThreshold = config.getInteger( MEMORY_THRESHOLD_CONF_KEY, FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD); - + if (checkpointDirURI == null) { throw new IllegalConfigurationException( "Cannot create the file system state backend: The configuration does not specify the " + "checkpoint directory '" + CHECKPOINT_DIRECTORY_URI_CONF_KEY + '\''); } - + try { Path path = new Path(checkpointDirURI); return new FsStateBackend(path.toUri(), memoryThreshold); } - catch (IllegalArgumentException e) { - throw new Exception("Cannot initialize File System State Backend with URI '" - + checkpointDirURI + '.', e); + catch (IOException | IllegalArgumentException e) { + throw new IllegalConfigurationException("Invalid configuration for the state backend", e); } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java new file mode 100644 index 0000000..2f34ed8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the classes for key/value state backends that store the state + * on the JVM heap as objects. + */ +package org.apache.flink.runtime.state.heap; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java new file mode 100644 index 0000000..fcc4df9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/package-info.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package holds the classes of the <b>internal state type hierarchy</b>. + * + * <p>The internal state classes give access to the namespace getters and setters and access to + * additional functionality, like raw value access or state merging. + * + * <p>The public API state hierarchy is intended to be programmed against by Flink applications. + * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not + * intended to be used by user applications. These internal methods are considered of limited use to users and + * only confusing, and are usually not regarded as stable across releases. + * + * <p>Each specific type in the internal state hierarchy extends the type from the public + * state hierarchy. The following illustrates the relationship between the public- and the internal + * hierarchy at the example of a subset of the classes: + * + * <pre> + * State + * | + * +-------------------InternalKvState + * | | + * MergingState | + * | | + * +-----------------InternalMergingState + * | | + * +--------+------+ | + * | | | + * ReducingState ListState +-----+-----------------+ + * | | | | + * | +----------- -----------------InternalListState + * | | + * +------------------InternalReducingState + * </pre> + */ +package org.apache.flink.runtime.state.internal; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 2cc1164..6e6b034 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -90,7 +90,7 @@ public class MemoryStateBackend extends AbstractStateBackend { TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { + TaskKvStateRegistry kvStateRegistry) { return new HeapKeyedStateBackend<>( kvStateRegistry, http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index 9a39182..7ab71cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -62,6 +62,7 @@ public class CheckpointStatsTrackerTest { 191929L, 123, ExternalizedCheckpointSettings.none(), + null, false); CheckpointStatsTracker tracker = new CheckpointStatsTracker( http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 7949ef0..976da48 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.junit.Test; @@ -67,7 +66,7 @@ public class CoordinatorShutdownTest { JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, - 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), true)); + 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true)); ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); @@ -126,7 +125,7 @@ public class CoordinatorShutdownTest { JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, - 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), true)); + 5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true)); ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index 47e6826..8f565dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -106,7 +106,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { ClassLoader.getSystemClassLoader(), new UnregisteredMetricsGroup()); - executionGraph.enableSnapshotCheckpointing( + executionGraph.enableCheckpointing( 100, 100, 100, @@ -118,6 +118,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { counter, store, null, + null, CheckpointStatsTrackerTest.createTestTracker()); JobVertex jobVertex = new JobVertex("MockVertex"); http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 46ce3f4..3090172 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -112,7 +112,7 @@ public class ArchivedExecutionGraphTest { mock(JobSnapshottingSettings.class), new UnregisteredMetricsGroup()); - runtimeGraph.enableSnapshotCheckpointing( + runtimeGraph.enableCheckpointing( 100, 100, 100, @@ -124,6 +124,7 @@ public class ArchivedExecutionGraphTest { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), null, + null, statsTracker); Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java index 667dbca..2508d5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java @@ -20,11 +20,14 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.junit.Test; import java.util.Arrays; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class JobSnapshottingSettingsTest { @@ -42,6 +45,7 @@ public class JobSnapshottingSettingsTest { 112, 12, ExternalizedCheckpointSettings.externalizeCheckpoints(true), + new MemoryStateBackend(), false); JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings); @@ -55,5 +59,7 @@ public class JobSnapshottingSettingsTest { assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints()); assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation()); assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce()); + assertNotNull(copy.getDefaultStateBackend()); + assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index cbb077c..115b06c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -225,6 +225,7 @@ public class JobManagerHARecoveryTest { 0, 1, ExternalizedCheckpointSettings.none(), + null, true)); BlockingStatefulInvokable.initializeStaticHelpers(slots); http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index c5f6d99..727fc65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -829,6 +829,7 @@ public class JobManagerTest { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); jobGraph.setSnapshotSettings(snapshottingSettings); @@ -954,6 +955,7 @@ public class JobManagerTest { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); jobGraph.setSnapshotSettings(snapshottingSettings); @@ -1059,6 +1061,7 @@ public class JobManagerTest { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); jobGraph.setSnapshotSettings(snapshottingSettings); @@ -1161,6 +1164,7 @@ public class JobManagerTest { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); jobGraph.setSnapshotSettings(snapshottingSettings); @@ -1207,6 +1211,7 @@ public class JobManagerTest { 0, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), + null, true); newJobGraph.setSnapshotSettings(newSnapshottingSettings); http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index feb3d4d..529c100 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -229,7 +229,7 @@ public class JobSubmitTest { JobGraph jg = new JobGraph("test job", jobVertex); jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, - 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), true)); + 5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), null, true)); return jg; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java new file mode 100644 index 0000000..a64faf1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * This test validates that state backends are properly loaded from configuration. + */ +public class StateBackendLoadingTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + private final ClassLoader cl = getClass().getClassLoader(); + + private final String backendKey = CoreOptions.STATE_BACKEND.key(); + + // ------------------------------------------------------------------------ + + @Test + public void testNoStateBackendDefined() throws Exception { + assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null)); + } + + @Test + public void testInstantiateMemoryBackendByDefault() throws Exception { + StateBackend backend = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + + assertTrue(backend instanceof MemoryStateBackend); + } + + @Test + public void testLoadMemoryStateBackend() throws Exception { + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + final Configuration config = new Configuration(); + config.setString(backendKey, "jobmanager"); + + StateBackend backend = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + + assertTrue(backend instanceof MemoryStateBackend); + } + + @Test + public void testLoadFileSystemStateBackend() throws Exception { + final String checkpointDir = new Path(tmp.getRoot().toURI()).toString(); + final Path expectedPath = new Path(checkpointDir); + final int threshold = 1000000; + + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "filesystem"); + config1.setString("state.checkpoints.dir", checkpointDir); + config1.setString("state.backend.fs.checkpointdir", checkpointDir); + config1.setInteger("state.backend.fs.memory-threshold", threshold); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey, FsStateBackendFactory.class.getName()); + config2.setString("state.checkpoints.dir", checkpointDir); + config2.setString("state.backend.fs.checkpointdir", checkpointDir); + config2.setInteger("state.backend.fs.memory-threshold", threshold); + + StateBackend backend1 = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(config1, cl, null); + + StateBackend backend2 = AbstractStateBackend + .loadStateBackendFromConfigOrCreateDefault(config2, cl, null); + + assertTrue(backend1 instanceof FsStateBackend); + assertTrue(backend2 instanceof FsStateBackend); + + FsStateBackend fs1 = (FsStateBackend) backend1; + FsStateBackend fs2 = (FsStateBackend) backend2; + + assertEquals(expectedPath, fs1.getBasePath()); + assertEquals(expectedPath, fs2.getBasePath()); + assertEquals(threshold, fs1.getMinFileSizeThreshold()); + assertEquals(threshold, fs2.getMinFileSizeThreshold()); + } + + /** + * This test makes sure that failures properly manifest when the state backend could not be loaded. + */ + @Test + public void testLoadingFails() throws Exception { + final Configuration config = new Configuration(); + + // try a value that is neither recognized as a name, nor corresponds to a class + config.setString(backendKey, "does.not.exist"); + try { + AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null); + fail("should fail with an exception"); + } catch (DynamicCodeLoadingException ignored) { + // expected + } + + // try a class that is not a factory + config.setString(backendKey, java.io.File.class.getName()); + try { + AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null); + fail("should fail with an exception"); + } catch (DynamicCodeLoadingException ignored) { + // expected + } + + // a factory that fails + config.setString(backendKey, FailingFactory.class.getName()); + try { + AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(config, cl, null); + fail("should fail with an exception"); + } catch (IOException ignored) { + // expected + } + } + + // ------------------------------------------------------------------------ + + static final class FailingFactory implements StateBackendFactory<StateBackend> { + private static final long serialVersionUID = 1L; + + @Override + public StateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException { + throw new IOException("fail!"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 75f1fd4..31e72dd 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -822,6 +822,7 @@ class JobManagerITCase(_system: ActorSystem) 60000, 1, ExternalizedCheckpointSettings.none, + null, true)) // Submit job... @@ -881,6 +882,7 @@ class JobManagerITCase(_system: ActorSystem) 60000, 1, ExternalizedCheckpointSettings.none, + null, true)) // Submit job... @@ -948,6 +950,7 @@ class JobManagerITCase(_system: ActorSystem) 60000, 1, ExternalizedCheckpointSettings.none, + null, true)) // Submit job... http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index f55ff47..bd018c3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -82,7 +82,7 @@ public class StreamGraphGenerator { public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; // The StreamGraph that is being built, this is initialized at the beginning. - private StreamGraph streamGraph; + private final StreamGraph streamGraph; private final StreamExecutionEnvironment env; http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index a4bb165..003eff9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -539,6 +539,7 @@ public class StreamingJobGraphGenerator { cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints(), externalizedCheckpointSettings, + streamGraph.getStateBackend(), isExactlyOnce); jobGraph.setSnapshotSettings(settings); http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 938ffd2..1e208ee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -20,9 +20,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; @@ -43,13 +40,10 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StateBackendFactory; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -63,6 +57,7 @@ import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,7 +142,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private StreamConfig configuration; /** Our state backend. We use this to create checkpoint streams and a keyed state backend. */ - private AbstractStateBackend stateBackend; + private StateBackend stateBackend; /** Keyed state backend for the head operator, if it is keyed. There can only ever be one. */ private AbstractKeyedStateBackend<?> keyedStateBackend; @@ -713,61 +708,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // State backend // ------------------------------------------------------------------------ - private AbstractStateBackend createStateBackend() throws Exception { - AbstractStateBackend stateBackend = configuration.getStateBackend(getUserCodeClassLoader()); + private StateBackend createStateBackend() throws Exception { + final StateBackend fromJob = configuration.getStateBackend(getUserCodeClassLoader()); - if (stateBackend != null) { + if (fromJob != null) { // backend has been configured on the environment LOG.info("Using user-defined state backend: {}.", stateBackend); - } else { - // see if we have a backend specified in the configuration - Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration(); - String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null); - - if (backendName == null) { - LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)"); - backendName = "jobmanager"; - } - - switch (backendName.toLowerCase()) { - case "jobmanager": - LOG.info("State backend is set to heap memory (checkpoint to jobmanager)"); - stateBackend = new MemoryStateBackend(); - break; - - case "filesystem": - FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig); - LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")", - backend.getBasePath()); - stateBackend = backend; - break; - - case "rocksdb": - backendName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory"; - // fall through to the 'default' case that uses reflection to load the backend - // that way we can keep RocksDB in a separate module - - default: - try { - @SuppressWarnings("rawtypes") - Class<? extends StateBackendFactory> clazz = - Class.forName(backendName, false, getUserCodeClassLoader()). - asSubclass(StateBackendFactory.class); - - stateBackend = clazz.newInstance().createFromConfig(flinkConfig); - } catch (ClassNotFoundException e) { - throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName); - } catch (ClassCastException e) { - throw new IllegalConfigurationException("The class configured under '" + - CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" + - backendName + ')'); - } catch (Throwable t) { - throw new IllegalConfigurationException("Cannot create configured state backend", t); - } - } + return fromJob; + } + else { + return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault( + getEnvironment().getTaskManagerInfo().getConfiguration(), + getUserCodeClassLoader(), + LOG); } - - return stateBackend; } public OperatorStateBackend createOperatorStateBackend( http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java index 51294ce..e266ea1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java @@ -183,7 +183,7 @@ public class BlockingCheckpointsTest { public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, - KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) throws Exception { + KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 3d01fdd..3826051 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -807,33 +807,39 @@ public class StreamTaskTest extends TestLogger { private static final long serialVersionUID = 1L; @Override - public AbstractStateBackend createFromConfig(Configuration config) throws Exception { + public AbstractStateBackend createFromConfig(Configuration config) { AbstractStateBackend stateBackendMock = mock(AbstractStateBackend.class); - Mockito.when(stateBackendMock.createOperatorStateBackend( - Mockito.any(Environment.class), - Mockito.any(String.class))) - .thenAnswer(new Answer<OperatorStateBackend>() { - @Override - public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { - return Mockito.mock(OperatorStateBackend.class); - } - }); - - Mockito.when(stateBackendMock.createKeyedStateBackend( - Mockito.any(Environment.class), - Mockito.any(JobID.class), - Mockito.any(String.class), - Mockito.any(TypeSerializer.class), - Mockito.any(int.class), - Mockito.any(KeyGroupRange.class), - Mockito.any(TaskKvStateRegistry.class))) - .thenAnswer(new Answer<AbstractKeyedStateBackend>() { - @Override - public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { - return Mockito.mock(AbstractKeyedStateBackend.class); - } - }); + try { + Mockito.when(stateBackendMock.createOperatorStateBackend( + Mockito.any(Environment.class), + Mockito.any(String.class))) + .thenAnswer(new Answer<OperatorStateBackend>() { + @Override + public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + return Mockito.mock(OperatorStateBackend.class); + } + }); + + Mockito.when(stateBackendMock.createKeyedStateBackend( + Mockito.any(Environment.class), + Mockito.any(JobID.class), + Mockito.any(String.class), + Mockito.any(TypeSerializer.class), + Mockito.any(int.class), + Mockito.any(KeyGroupRange.class), + Mockito.any(TaskKvStateRegistry.class))) + .thenAnswer(new Answer<AbstractKeyedStateBackend>() { + @Override + public AbstractKeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + return Mockito.mock(AbstractKeyedStateBackend.class); + } + }); + } + catch (Exception e) { + // this is needed, because the signatures of the mocked methods throw 'Exception' + throw new RuntimeException(e); + } return stateBackendMock; } http://git-wip-us.apache.org/repos/asf/flink/blob/3446e66a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index 79665dd..4677242 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -109,7 +109,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase { TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry) throws IOException { throw new SuccessException(); } }
