Repository: flink Updated Branches: refs/heads/master d139e6340 -> 2906698b4
[FLINK-8053] [checkpoints] Default to asynchronous snapshots for FsStateBackend and MemoryStateBackend. This closes #5005. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2906698b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2906698b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2906698b Branch: refs/heads/master Commit: 2906698b4a87f21c6fd099cf8a028f68fc311b1f Parents: d139e63 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Nov 13 14:31:45 2017 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Nov 14 11:52:32 2017 +0100 ---------------------------------------------------------------------- docs/ops/state/large_state_tuning.md | 6 +- docs/ops/state/state_backends.md | 5 +- .../state/filesystem/FsStateBackend.java | 4 +- .../state/memory/MemoryStateBackend.java | 2 +- ...HeapKeyedStateBackendAsyncByDefaultTest.java | 86 ++++++++++++++++++++ 5 files changed, 93 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/docs/ops/state/large_state_tuning.md ---------------------------------------------------------------------- diff --git a/docs/ops/state/large_state_tuning.md b/docs/ops/state/large_state_tuning.md index dd3e404..85ffd99 100644 --- a/docs/ops/state/large_state_tuning.md +++ b/docs/ops/state/large_state_tuning.md @@ -113,11 +113,9 @@ To get state to be snapshotted asynchronously, applications have to do two thing interfaces like `ValueState`, `ListState`, `ReducingState`, ... 2. Use a state backend that supports asynchronous snapshots. In Flink 1.2, only the RocksDB state backend uses - fully asynchronous snapshots. - -The above two points imply that (in Flink 1.2) large state should generally be kept as keyed state, not as operator state. -This is subject to change with the planned introduction of *managed operator state*. + fully asynchronous snapshots. Starting from Flink 1.3, heap-based state backends also support asynchronous snapshots. +The above two points imply that large state should generally be kept as keyed state, not as operator state. ## Tuning RocksDB http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/docs/ops/state/state_backends.md ---------------------------------------------------------------------- diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md index 422df3e..b32ad9f 100644 --- a/docs/ops/state/state_backends.md +++ b/docs/ops/state/state_backends.md @@ -81,11 +81,10 @@ The *FsStateBackend* is configured with a file system URL (type, address, path), The FsStateBackend holds in-flight data in the TaskManager's memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager's memory (or, in high-availability mode, in the metadata checkpoint). -The FsStateBackend can be configured to use asynchronous snapshots. While we strongly encourage the use of asynchronous snapshots to avoid blocking pipelines, please note that this is a new feature and currently not enabled -by default. To enable this feature, users can instantiate a `FsStateBackend` with the corresponding boolean flag in the constructor set to `true`, e.g.: +The FsStateBackend uses *asynchronous snapshots by default* to avoid blocking the processing pipeline while writing state checkpoints. To disable this feature, users can instantiate a `FsStateBackend` with the corresponding boolean flag in the constructor set to `false`, e.g.: {% highlight java %} - new FsStateBackend(path, true); + new FsStateBackend(path, false); {% endhighlight %} The FsStateBackend is encouraged for: http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index ddfa85c..952988f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -163,7 +163,7 @@ public class FsStateBackend extends AbstractStateBackend { * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public FsStateBackend(URI checkpointDataUri) throws IOException { - this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, false); + this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, true); } /** @@ -208,7 +208,7 @@ public class FsStateBackend extends AbstractStateBackend { */ public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException { - this(checkpointDataUri, fileStateSizeThreshold, false); + this(checkpointDataUri, fileStateSizeThreshold, true); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 7ed1dea..b8ebedf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -65,7 +65,7 @@ public class MemoryStateBackend extends AbstractStateBackend { * @param maxStateSize The maximal size of the serialized state */ public MemoryStateBackend(int maxStateSize) { - this(maxStateSize, false); + this(maxStateSize, true); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2906698b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java new file mode 100644 index 0000000..ac4cbeb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.IOUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +/** + * This tests that all heap-based {@link StateBackend}s create {@link KeyedStateBackend}s that use asynchronous + * snapshots by default. + */ +public class HeapKeyedStateBackendAsyncByDefaultTest { + + @Test + public void testFsStateBackendDefaultsToAsync() throws Exception { + TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + + File folder = temporaryFolder.newFolder(); + + try { + // This backend has two constructors that use a default value for async snapshots. + FsStateBackend fsStateBackend = new FsStateBackend(folder.toURI()); + validateSupportForAsyncSnapshots(fsStateBackend); + + fsStateBackend = new FsStateBackend(folder.toURI(), 1024); + validateSupportForAsyncSnapshots(fsStateBackend); + } finally { + folder.delete(); + temporaryFolder.delete(); + } + } + + @Test + public void testMemoryStateBackendDefaultsToAsync() throws Exception { + MemoryStateBackend memoryStateBackend = new MemoryStateBackend(); + validateSupportForAsyncSnapshots(memoryStateBackend); + } + + private void validateSupportForAsyncSnapshots(AbstractStateBackend backend) throws IOException { + + AbstractKeyedStateBackend<Integer> keyedStateBackend = backend.createKeyedStateBackend( + new DummyEnvironment("Test", 1, 0), + new JobID(), + "testOperator", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + null + ); + + try { + Assert.assertTrue(keyedStateBackend.supportsAsynchronousSnapshots()); + } finally { + IOUtils.closeQuietly(keyedStateBackend); + keyedStateBackend.dispose(); + } + } +}