This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a835f31 [FLINK-17865][checkpoint] Increase default size of
'state.backend.fs.memory-threshold'
a835f31 is described below
commit a835f31a3a78f34b4a80f9e634b34c6a6681a482
Author: Yun Tang <[email protected]>
AuthorDate: Thu May 21 20:37:56 2020 +0800
[FLINK-17865][checkpoint] Increase default size of
'state.backend.fs.memory-threshold'
This closes #12282.
---
.../generated/checkpointing_configuration.html | 6 ++--
.../generated/expert_state_backends_section.html | 6 ++--
.../flink/configuration/CheckpointingOptions.java | 7 ++--
.../state/api/output/SavepointOutputFormat.java | 2 +-
.../pyflink/datastream/tests/test_state_backend.py | 2 +-
.../runtime/state/filesystem/FsStateBackend.java | 39 ++++++++++++++--------
.../runtime/state/StateBackendLoadingTest.java | 17 +++++-----
.../flink/test/checkpointing/SavepointITCase.java | 4 +--
.../utils/SavepointMigrationTestBase.java | 3 +-
9 files changed, 50 insertions(+), 36 deletions(-)
diff --git a/docs/_includes/generated/checkpointing_configuration.html
b/docs/_includes/generated/checkpointing_configuration.html
index c8517a5..748bdb2 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -22,9 +22,9 @@
</tr>
<tr>
<td><h5>state.backend.fs.memory-threshold</h5></td>
- <td style="word-wrap: break-word;">1024</td>
- <td>Integer</td>
- <td>The minimum size of state data files. All state chunks smaller
than that are stored inline in the root checkpoint metadata file.</td>
+ <td style="word-wrap: break-word;">20 kb</td>
+ <td>MemorySize</td>
+ <td>The minimum size of state data files. All state chunks smaller
than that are stored inline in the root checkpoint metadata file. The max
memory threshold for this configuration is 1MB.</td>
</tr>
<tr>
<td><h5>state.backend.fs.write-buffer-size</h5></td>
diff --git a/docs/_includes/generated/expert_state_backends_section.html
b/docs/_includes/generated/expert_state_backends_section.html
index 9d50be1..0fed867 100644
--- a/docs/_includes/generated/expert_state_backends_section.html
+++ b/docs/_includes/generated/expert_state_backends_section.html
@@ -16,9 +16,9 @@
</tr>
<tr>
<td><h5>state.backend.fs.memory-threshold</h5></td>
- <td style="word-wrap: break-word;">1024</td>
- <td>Integer</td>
- <td>The minimum size of state data files. All state chunks smaller
than that are stored inline in the root checkpoint metadata file.</td>
+ <td style="word-wrap: break-word;">20 kb</td>
+ <td>MemorySize</td>
+ <td>The minimum size of state data files. All state chunks smaller
than that are stored inline in the root checkpoint metadata file. The max
memory threshold for this configuration is 1MB.</td>
</tr>
<tr>
<td><h5>state.backend.fs.write-buffer-size</h5></td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index df19ab9..16eaf75 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -140,11 +140,12 @@ public class CheckpointingOptions {
/** The minimum size of state data files. All state chunks smaller than
that
* are stored inline in the root checkpoint metadata file. */
@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
- public static final ConfigOption<Integer> FS_SMALL_FILE_THRESHOLD =
ConfigOptions
+ public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD =
ConfigOptions
.key("state.backend.fs.memory-threshold")
- .defaultValue(1024)
+ .memoryType()
+ .defaultValue(MemorySize.parse("20kb"))
.withDescription("The minimum size of state data files.
All state chunks smaller than that are stored" +
- " inline in the root checkpoint metadata
file.");
+ " inline in the root checkpoint metadata file.
The max memory threshold for this configuration is 1MB.");
/**
* The default size of the write buffer for the checkpoint streams that
write to file systems.
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
index 8235067..bebd435 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
@@ -93,7 +93,7 @@ public class SavepointOutputFormat extends
RichOutputFormat<CheckpointMetadata>
location,
location,
reference,
-
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(),
+ (int)
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue());
}
}
diff --git a/flink-python/pyflink/datastream/tests/test_state_backend.py
b/flink-python/pyflink/datastream/tests/test_state_backend.py
index 4f3249f..3ce18dc 100644
--- a/flink-python/pyflink/datastream/tests/test_state_backend.py
+++ b/flink-python/pyflink/datastream/tests/test_state_backend.py
@@ -97,7 +97,7 @@ class FsStateBackendTests(PyFlinkTestCase):
state_backend = FsStateBackend("file://var/checkpoints/")
- self.assertEqual(state_backend.get_min_file_size_threshold(), 1024)
+ self.assertEqual(state_backend.get_min_file_size_threshold(), 20480)
state_backend = FsStateBackend("file://var/checkpoints/",
file_state_size_threshold=2048)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index e45fb20..1c61ab6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.MathUtils;
import org.apache.flink.util.TernaryBoolean;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.Collection;
+import static
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -363,22 +365,24 @@ public class FsStateBackend extends
AbstractFileStateBackend implements Configur
this.asynchronousSnapshots =
original.asynchronousSnapshots.resolveUndefined(
configuration.get(CheckpointingOptions.ASYNC_SNAPSHOTS));
- final int sizeThreshold = original.fileStateThreshold >= 0 ?
- original.fileStateThreshold :
-
configuration.get(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);
+ if (getValidFileStateThreshold(original.fileStateThreshold) >=
0) {
+ this.fileStateThreshold = original.fileStateThreshold;
+ } else {
+ final int configuredStateThreshold =
+
getValidFileStateThreshold(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes());
- if (sizeThreshold >= 0 && sizeThreshold <=
MAX_FILE_STATE_THRESHOLD) {
- this.fileStateThreshold = sizeThreshold;
- }
- else {
- this.fileStateThreshold =
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+ if (configuredStateThreshold >= 0) {
+ this.fileStateThreshold =
configuredStateThreshold;
+ } else {
+ this.fileStateThreshold =
MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
- // because this is the only place we (unlikely) ever
log, we lazily
- // create the logger here
-
LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
+ // because this is the only place we (unlikely)
ever log, we lazily
+ // create the logger here
+
LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
"Ignoring invalid file size threshold
value ({}): {} - using default value {} instead.",
-
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
-
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
+ FS_SMALL_FILE_THRESHOLD.key(),
configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes(),
+ FS_SMALL_FILE_THRESHOLD.defaultValue());
+ }
}
final int bufferSize = original.writeBufferSize >= 0 ?
@@ -388,6 +392,13 @@ public class FsStateBackend extends
AbstractFileStateBackend implements Configur
this.writeBufferSize = Math.max(bufferSize,
this.fileStateThreshold);
}
+ private int getValidFileStateThreshold(long fileStateThreshold) {
+ if (fileStateThreshold >= 0 && fileStateThreshold <=
MAX_FILE_STATE_THRESHOLD) {
+ return (int) fileStateThreshold;
+ }
+ return -1;
+ }
+
//
------------------------------------------------------------------------
// Properties
//
------------------------------------------------------------------------
@@ -432,7 +443,7 @@ public class FsStateBackend extends
AbstractFileStateBackend implements Configur
public int getMinFileSizeThreshold() {
return fileStateThreshold >= 0 ?
fileStateThreshold :
-
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+
MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
index 877a51d..5340290 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -227,7 +228,7 @@ public class StateBackendLoadingTest {
final String savepointDir = new
Path(tmp.newFolder().toURI()).toString();
final Path expectedCheckpointsPath = new Path(checkpointDir);
final Path expectedSavepointsPath = new Path(savepointDir);
- final int threshold = 1000000;
+ final MemorySize threshold = MemorySize.parse("900kb");
final int minWriteBufferSize = 1024;
final boolean async =
!CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
@@ -237,7 +238,7 @@ public class StateBackendLoadingTest {
config1.setString(backendKey, "filesystem");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir);
-
config1.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+ config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
threshold);
config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE,
minWriteBufferSize);
config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
@@ -245,7 +246,7 @@ public class StateBackendLoadingTest {
config2.setString(backendKey,
FsStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir);
-
config2.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+ config2.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
threshold);
config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE,
minWriteBufferSize);
config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
@@ -262,10 +263,10 @@ public class StateBackendLoadingTest {
assertEquals(expectedCheckpointsPath, fs2.getCheckpointPath());
assertEquals(expectedSavepointsPath, fs1.getSavepointPath());
assertEquals(expectedSavepointsPath, fs2.getSavepointPath());
- assertEquals(threshold, fs1.getMinFileSizeThreshold());
- assertEquals(threshold, fs2.getMinFileSizeThreshold());
- assertEquals(Math.max(threshold, minWriteBufferSize),
fs1.getWriteBufferSize());
- assertEquals(Math.max(threshold, minWriteBufferSize),
fs2.getWriteBufferSize());
+ assertEquals(threshold.getBytes(),
fs1.getMinFileSizeThreshold());
+ assertEquals(threshold.getBytes(),
fs2.getMinFileSizeThreshold());
+ assertEquals(Math.max(threshold.getBytes(),
minWriteBufferSize), fs1.getWriteBufferSize());
+ assertEquals(Math.max(threshold.getBytes(),
minWriteBufferSize), fs2.getWriteBufferSize());
assertEquals(async, fs1.isUsingAsynchronousSnapshots());
assertEquals(async, fs2.isUsingAsynchronousSnapshots());
}
@@ -293,7 +294,7 @@ public class StateBackendLoadingTest {
config.setString(backendKey, "jobmanager"); // this should not
be picked up
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir); // this should not be picked up
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir);
- config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
20); // this should not be picked up
+ config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
MemorySize.parse("20")); // this should not be picked up
config.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE,
3000000); // this should not be picked up
final StateBackend loadedBackend =
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index be9b706..abdc188 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -576,7 +576,7 @@ public class SavepointITCase extends TestLogger {
if (data == null) {
// We need this to be large, because we want to
test with files
Random rand = new
Random(getRuntimeContext().getIndexOfThisSubtask());
- data = new
byte[CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue() + 1];
+ data = new byte[(int)
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes() + 1];
rand.nextBytes(data);
}
}
@@ -833,7 +833,7 @@ public class SavepointITCase extends TestLogger {
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.STATE_BACKEND,
"filesystem");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
- config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
0);
+ config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
MemorySize.ZERO);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir);
return config;
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 638edb5..1e7f619 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -109,7 +110,7 @@ public abstract class SavepointMigrationTestBase extends
TestBaseUtils {
config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
- config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
0);
+ config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
MemorySize.ZERO);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir.toURI().toString());
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL,
300L);