[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320793#comment-16320793
 ] 

ASF GitHub Bot commented on FLINK-5823:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5248#discussion_r160761075
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 ---
    @@ -18,92 +18,272 @@
     
     package org.apache.flink.runtime.state.memory;
     
    +import org.apache.flink.annotation.PublicEvolving;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.CheckpointingOptions;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.Path;
     import org.apache.flink.runtime.execution.Environment;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
     import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
    -import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.CheckpointStorage;
     import org.apache.flink.runtime.state.CheckpointStreamFactory;
    +import org.apache.flink.runtime.state.ConfigurableStateBackend;
     import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
     import org.apache.flink.runtime.state.KeyGroupRange;
     import org.apache.flink.runtime.state.OperatorStateBackend;
    +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
     import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
     
    +import javax.annotation.Nullable;
    +
     import java.io.IOException;
     
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +
     /**
    - * A {@link AbstractStateBackend} that stores all its data and checkpoints 
in memory and has no
    - * capabilities to spill to disk. Checkpoints are serialized and the 
serialized data is
    - * transferred
    + * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
    + * The state backend checkpoints state directly to the JobManager's memory 
(hence the backend's name),
    + * but the checkpoints will be persisted to a file system for 
high-availability setups and savepoints.
    + * The MemoryStateBackend is consequently a FileSystem-based backend that 
can work without a
    + * file system dependency in simple setups.
    + *
    + * <p>This state backend should be used only for experimentation, quick 
local setups,
    + * or for streaming applications that have very small state: Because it 
requires checkpoints to
    + * go through the JobManager's memory, larger state will occupy larger 
portions of the JobManager's  
    + * main memory, reducing operational stability.
    + * For any other setup, the {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend}
    + * should be used. The {@code FsStateBackend} holds the working state on 
the TaskManagers in the same way, but
    + * checkpoints state directly to files rather then to the JobManager's 
memory, thus supporting
    + * large state sizes. 
    + *
    + * <h1>State Size Considerations</h1>
    + *
    + * State checkpointing with this state backend is subject to the following 
conditions:
    + * <ul>
    + *     <li>Each individual state must not exceed the configured maximum 
state size
    + *         (see {@link #getMaxStateSize()}.</li>
    + *
    + *     <li>All state from one task (i.e., the sum of all operator states 
and keyed states from all
    + *         chained operators of the task) must not exceed what the RPC 
system supports, which is
    + *         be default < 10 MB. That limit can be configured up, but that 
is typically not advised.</li>
    + *
    + *     <li>The sum of all states in the application times all retained 
checkpoints must comfortably
    + *         fit into the JobManager's JVM heap space.</li>
    + * </ul>
    + *
    + * <h1>Persistence Guarantees</h1>
    + *
    + * For the use cases where the state sizes can be handled by this backend, 
the backend does guarantee
    + * persistence for savepoints, externalized checkpoints (of configured), 
and checkpoints
    + * (when high-availability is configured).
    + *
    + * <h1>Configuration</h1>
    + *
    + * As for all state backends, this backend can either be configured within 
the application (by creating
    + * the backend with the respective constructor parameters and setting it 
on the execution environment)
    + * or by specifying it in the Flink configuration.
    + *
    + * <p>If the state backend was specified in the application, it may pick 
up additional configuration
    + * parameters from the Flink configuration. For example, if the backend if 
configured in the application
    + * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
    + * Flink configuration of the running job/cluster. That behavior is 
implemented via the
    + * {@link #configure(Configuration)} method.
      */
    -public class MemoryStateBackend extends AbstractStateBackend {
    +@PublicEvolving
    +public class MemoryStateBackend extends AbstractFileStateBackend 
implements ConfigurableStateBackend {
     
        private static final long serialVersionUID = 4109305377809414635L;
     
        /** The default maximal size that the snapshotted memory state may have 
(5 MiBytes) */
    -   private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
    +   public static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;
     
        /** The maximal size that the snapshotted memory state may have */
        private final int maxStateSize;
     
    -   /** Switch to chose between synchronous and asynchronous snapshots */
    -   private final boolean asynchronousSnapshots;
    +   /** Switch to chose between synchronous and asynchronous snapshots.
    +    * Null if not yet configured, in which case the default will be used. 
*/
    +   @Nullable
    +   private final Boolean asynchronousSnapshots;
    --- End diff --
    
    That makes sense, will do that.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> -----------------------------------------------------------------------
>
>                 Key: FLINK-5823
>                 URL: https://issues.apache.org/jira/browse/FLINK-5823
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to