[
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219565#comment-16219565
]
ASF GitHub Bot commented on FLINK-5823:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4907#discussion_r146991686
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
---
@@ -18,54 +18,92 @@
package org.apache.flink.runtime.state.filesystem;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.URI;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * The file state backend is a state backend that stores the state of
streaming jobs in a file system.
+ * This state backend holds the working state in the memory (JVM heap) of
the TaskManagers.
+ * The state backend checkpoints state as files to a file system (hence
the backend's name).
+ *
+ * <p>Each checkpoint individually will store all its files in a
subdirectory that includes the
+ * checkpoint number, such as {@code
hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * Working state is kept on the TaskManager heap. If a TaskManager
executes multiple
+ * tasks concurrently (if the TaskManager has multiple slots, or if
slot-sharing is used)
+ * then the aggregate state of all tasks needs to fit into that
TaskManager's memory.
+ *
+ * <p>This state backend stores small state chunks directly with the
metadata, to avoid creating
+ * many small files. The threshold for that is configurable. When
increasing this threshold, the
+ * size of the checkpoint metadata increases. The checkpoint metadata of
all retained completed
+ * checkpoints needs to fit into the JobManager's heap memory. This is
typically not a problem,
+ * unless the threshold {@link #getMinFileSizeThreshold()} is increased
significantly.
+ *
+ * <h1>Persistence Guarantees</h1>
*
- * <p>The state backend has one core directory into which it puts all
checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint
gets a directory, with
- * files for each state, for example:
+ * Checkpoints from this state backend are as persistent and available as
filesystem that is written to.
+ * If the file system is a persistent distributed file system, this state
backend supports
+ * highly available setups. The backend additionally supports savepoints
and externalized checkpoints.
*
- * {@code
hdfs://namenode:port/flink-checkpoints/<job-id>/chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
}
+ * <h1>Configuration</h1>
+ *
+ * As for all state backends, this backend can either be configured within
the application (by creating
+ * the backend with the respective constructor parameters and setting it
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * <p>If the state backend was specified in the application, it may pick
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if
configured in the application
+ * without a default savepoint directory, it will pick up a default
savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is
implemented via the
+ * {@link #configure(Configuration)} method.
*/
-public class FsStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class FsStateBackend extends AbstractFileStateBackend implements
ConfigurableStateBackend {
--- End diff --
Why isn't `AbstractFileStateBackend` configurable?
> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> -----------------------------------------------------------------------
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Blocker
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)