[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219565#comment-16219565
 ] 

ASF GitHub Bot commented on FLINK-5823:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4907#discussion_r146991686
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
    @@ -18,54 +18,92 @@
     
     package org.apache.flink.runtime.state.filesystem;
     
    +import org.apache.flink.annotation.PublicEvolving;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.CheckpointingOptions;
    +import org.apache.flink.configuration.Configuration;
     import org.apache.flink.core.fs.FileSystem;
     import org.apache.flink.core.fs.Path;
     import org.apache.flink.runtime.execution.Environment;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
     import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
    -import org.apache.flink.runtime.state.AbstractStateBackend;
     import org.apache.flink.runtime.state.CheckpointStreamFactory;
    +import org.apache.flink.runtime.state.ConfigurableStateBackend;
     import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
     import org.apache.flink.runtime.state.KeyGroupRange;
     import org.apache.flink.runtime.state.OperatorStateBackend;
     import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
     
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
     import java.io.IOException;
     import java.net.URI;
     
     import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
    + * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
    + * The state backend checkpoints state as files to a file system (hence 
the backend's name).
    + *
    + * <p>Each checkpoint individually will store all its files in a 
subdirectory that includes the
    + * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/chk-17/}.
    + *
    + * <h1>State Size Considerations</h1>
    + *
    + * Working state is kept on the TaskManager heap. If a TaskManager 
executes multiple
    + * tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
    + * then the aggregate state of all tasks needs to fit into that 
TaskManager's memory.
    + *
    + * <p>This state backend stores small state chunks directly with the 
metadata, to avoid creating
    + * many small files. The threshold for that is configurable. When 
increasing this threshold, the
    + * size of the checkpoint metadata increases. The checkpoint metadata of 
all retained completed
    + * checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
    + * unless the threshold {@link #getMinFileSizeThreshold()} is increased 
significantly.
    + *
    + * <h1>Persistence Guarantees</h1>
      *
    - * <p>The state backend has one core directory into which it puts all 
checkpoint data. Inside that
    - * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
    - * files for each state, for example:
    + * Checkpoints from this state backend are as persistent and available as 
filesystem that is written to.
    + * If the file system is a persistent distributed file system, this state 
backend supports
    + * highly available setups. The backend additionally supports savepoints 
and externalized checkpoints.
      *
    - * {@code 
hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
    + * <h1>Configuration</h1>
    + *
    + * As for all state backends, this backend can either be configured within 
the application (by creating
    + * the backend with the respective constructor parameters and setting it 
on the execution environment)
    + * or by specifying it in the Flink configuration.
    + *
    + * <p>If the state backend was specified in the application, it may pick 
up additional configuration
    + * parameters from the Flink configuration. For example, if the backend if 
configured in the application
    + * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
    + * Flink configuration of the running job/cluster. That behavior is 
implemented via the
    + * {@link #configure(Configuration)} method.
      */
    -public class FsStateBackend extends AbstractStateBackend {
    +@PublicEvolving
    +public class FsStateBackend extends AbstractFileStateBackend implements 
ConfigurableStateBackend {
    --- End diff --
    
    Why isn't `AbstractFileStateBackend` configurable?


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> -----------------------------------------------------------------------
>
>                 Key: FLINK-5823
>                 URL: https://issues.apache.org/jira/browse/FLINK-5823
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to