[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320793#comment-16320793 ]
ASF GitHub Bot commented on FLINK-5823: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5248#discussion_r160761075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java --- @@ -18,92 +18,272 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import javax.annotation.Nullable; + import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkArgument; + /** - * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no - * capabilities to spill to disk. Checkpoints are serialized and the serialized data is - * transferred + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state directly to the JobManager's memory (hence the backend's name), + * but the checkpoints will be persisted to a file system for high-availability setups and savepoints. + * The MemoryStateBackend is consequently a FileSystem-based backend that can work without a + * file system dependency in simple setups. + * + * <p>This state backend should be used only for experimentation, quick local setups, + * or for streaming applications that have very small state: Because it requires checkpoints to + * go through the JobManager's memory, larger state will occupy larger portions of the JobManager's + * main memory, reducing operational stability. + * For any other setup, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} + * should be used. The {@code FsStateBackend} holds the working state on the TaskManagers in the same way, but + * checkpoints state directly to files rather then to the JobManager's memory, thus supporting + * large state sizes. + * + * <h1>State Size Considerations</h1> + * + * State checkpointing with this state backend is subject to the following conditions: + * <ul> + * <li>Each individual state must not exceed the configured maximum state size + * (see {@link #getMaxStateSize()}.</li> + * + * <li>All state from one task (i.e., the sum of all operator states and keyed states from all + * chained operators of the task) must not exceed what the RPC system supports, which is + * be default < 10 MB. That limit can be configured up, but that is typically not advised.</li> + * + * <li>The sum of all states in the application times all retained checkpoints must comfortably + * fit into the JobManager's JVM heap space.</li> + * </ul> + * + * <h1>Persistence Guarantees</h1> + * + * For the use cases where the state sizes can be handled by this backend, the backend does guarantee + * persistence for savepoints, externalized checkpoints (of configured), and checkpoints + * (when high-availability is configured). + * + * <h1>Configuration</h1> + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * <p>If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savepoint directory specified in the + * Flink configuration of the running job/cluster. That behavior is implemented via the + * {@link #configure(Configuration)} method. */ -public class MemoryStateBackend extends AbstractStateBackend { +@PublicEvolving +public class MemoryStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend { private static final long serialVersionUID = 4109305377809414635L; /** The default maximal size that the snapshotted memory state may have (5 MiBytes) */ - private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024; + public static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024; /** The maximal size that the snapshotted memory state may have */ private final int maxStateSize; - /** Switch to chose between synchronous and asynchronous snapshots */ - private final boolean asynchronousSnapshots; + /** Switch to chose between synchronous and asynchronous snapshots. + * Null if not yet configured, in which case the default will be used. */ + @Nullable + private final Boolean asynchronousSnapshots; --- End diff -- That makes sense, will do that. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > ----------------------------------------------------------------------- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)