Zakelly commented on code in PR #22973:
URL: https://github.com/apache/flink/pull/22973#discussion_r1560576402
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##########
@@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() {
* org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
*/
public @Nullable FileMergingSnapshotManager
fileMergingSnapshotManagerForJob(
- @Nonnull JobID jobId) {
+ @Nonnull JobID jobId,
+ Configuration clusterConfiguration,
+ Configuration jobConfiguration) {
+ boolean mergingEnabled =
+ jobConfiguration
+ .getOptional(FILE_MERGING_ENABLED)
+
.orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED));
synchronized (lock) {
if (closed) {
throw new IllegalStateException(
"TaskExecutorFileMergingManager is already closed and
cannot "
+ "register a new
FileMergingSnapshotManager.");
}
+ if (!mergingEnabled) {
+ return null;
+ }
FileMergingSnapshotManager fileMergingSnapshotManager =
fileMergingSnapshotManagerByJobId.get(jobId);
- if (fileMergingSnapshotManager == null) {
- // TODO FLINK-32440: choose different
FileMergingSnapshotManager by configuration
+ if (fileMergingSnapshotManager == null && mergingEnabled) {
+ FileMergingType fileMergingType =
+ jobConfiguration
+
.getOptional(FILE_MERGING_ACROSS_BOUNDARY)
+ .orElse(
+
clusterConfiguration.getBoolean(
+
FILE_MERGING_ACROSS_BOUNDARY))
+ ? FileMergingType.MERGE_ACROSS_CHECKPOINT
+ : FileMergingType.MERGE_WITHIN_CHECKPOINT;
+ MemorySize maxFileSize =
+ jobConfiguration
+ .getOptional(FILE_MERGING_MAX_FILE_SIZE)
+
.orElse(clusterConfiguration.get(FILE_MERGING_MAX_FILE_SIZE));
+ Boolean usingBlockingPoll =
Review Comment:
nit. `usingBlockingPool`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##########
@@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() {
* org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
*/
public @Nullable FileMergingSnapshotManager
fileMergingSnapshotManagerForJob(
- @Nonnull JobID jobId) {
+ @Nonnull JobID jobId,
+ Configuration clusterConfiguration,
+ Configuration jobConfiguration) {
+ boolean mergingEnabled =
+ jobConfiguration
+ .getOptional(FILE_MERGING_ENABLED)
+
.orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED));
synchronized (lock) {
if (closed) {
throw new IllegalStateException(
"TaskExecutorFileMergingManager is already closed and
cannot "
+ "register a new
FileMergingSnapshotManager.");
}
+ if (!mergingEnabled) {
+ return null;
+ }
FileMergingSnapshotManager fileMergingSnapshotManager =
fileMergingSnapshotManagerByJobId.get(jobId);
- if (fileMergingSnapshotManager == null) {
- // TODO FLINK-32440: choose different
FileMergingSnapshotManager by configuration
+ if (fileMergingSnapshotManager == null && mergingEnabled) {
Review Comment:
no need to check `mergingEnabled ` here? If it is false, the function has
returned above.
##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
<td>String</td>
<td>The default directory used for storing the data files and meta
data of checkpoints in a Flink supported filesystem. The storage path must be
accessible from all participating processes/nodes(i.e. all TaskManagers and
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only
the meta data of checkpoints will be stored in this directory.</td>
</tr>
+ <tr>
+
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Only relevant if <code
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is
enabled.<br />Whether to allow merging data of multiple checkpoints into one
physical file. If this option is set to false, only merge files within
checkpoint boundaries will be merged. Otherwise, it is possible for the logical
files of different checkpoints to share the same physical file.</td>
+ </tr>
+ <tr>
+ <td><h5>state.checkpoints.file-merging.enabled</h5></td>
Review Comment:
Is it possible adjust the order of each document item/entry? This one better
be the first.
##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
<td>String</td>
<td>The default directory used for storing the data files and meta
data of checkpoints in a Flink supported filesystem. The storage path must be
accessible from all participating processes/nodes(i.e. all TaskManagers and
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only
the meta data of checkpoints will be stored in this directory.</td>
</tr>
+ <tr>
+
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Only relevant if <code
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is
enabled.<br />Whether to allow merging data of multiple checkpoints into one
physical file. If this option is set to false, only merge files within
checkpoint boundaries will be merged. Otherwise, it is possible for the logical
files of different checkpoints to share the same physical file.</td>
Review Comment:
```suggestion
<td>Only relevant if <code
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is
enabled.<br />Whether to allow merging data of multiple checkpoints into one
physical file. If this option is set to false, only merge files within
checkpoint boundaries. Otherwise, it is possible for the logical files of
different checkpoints to share the same physical file.</td>
```
##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
<td>String</td>
<td>The default directory used for storing the data files and meta
data of checkpoints in a Flink supported filesystem. The storage path must be
accessible from all participating processes/nodes(i.e. all TaskManagers and
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only
the meta data of checkpoints will be stored in this directory.</td>
</tr>
+ <tr>
+
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Only relevant if <code
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is
enabled.<br />Whether to allow merging data of multiple checkpoints into one
physical file. If this option is set to false, only merge files within
checkpoint boundaries will be merged. Otherwise, it is possible for the logical
files of different checkpoints to share the same physical file.</td>
+ </tr>
+ <tr>
+ <td><h5>state.checkpoints.file-merging.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable merging multiple checkpoint files into one,
which will greatly reduce the number of small checkpoint files.</td>
+ </tr>
+ <tr>
+ <td><h5>state.checkpoints.file-merging.max-file-size</h5></td>
+ <td style="word-wrap: break-word;">32 mb</td>
+ <td>MemorySize</td>
+ <td>Max size of a physical file for merged checkpoints.</td>
+ </tr>
+ <tr>
+
<td><h5>state.checkpoints.file-merging.max-space-amplification</h5></td>
+ <td style="word-wrap: break-word;">0.75</td>
+ <td>Float</td>
+ <td>A threshold that triggers a compaction (re-uploading) of one
physical file. If the amount of invalid data in a physical file exceeds the
threshold, a new physical file will be created and uploaded.</td>
Review Comment:
```suggestion
<td>Space amplification stands for the magnification of the
occupied space compared to the amount of valid data. The more space
amplification is, the more waste of space will be. This configs a space
amplification above which a re-uploading for physical files will be triggered
to reclaim space.</td>
```
##########
flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java:
##########
@@ -107,6 +107,8 @@ public static final class Sections {
public static final String TRACE_REPORTERS = "trace_reporters";
+ public static final String FILE_MERGING = "file_merging";
Review Comment:
How about naming it `String CHECKPOINT_FILE_MERGING =
"checkpoint_file_merging";` since it seems ambiguous here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##########
@@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() {
* org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
*/
public @Nullable FileMergingSnapshotManager
fileMergingSnapshotManagerForJob(
- @Nonnull JobID jobId) {
+ @Nonnull JobID jobId,
+ Configuration clusterConfiguration,
+ Configuration jobConfiguration) {
+ boolean mergingEnabled =
+ jobConfiguration
+ .getOptional(FILE_MERGING_ENABLED)
+
.orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED));
Review Comment:
Use `get` instead of `getBoolean` since the latter is deprecated
##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
<td>String</td>
<td>The default directory used for storing the data files and meta
data of checkpoints in a Flink supported filesystem. The storage path must be
accessible from all participating processes/nodes(i.e. all TaskManagers and
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only
the meta data of checkpoints will be stored in this directory.</td>
</tr>
+ <tr>
+
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Only relevant if <code
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is
enabled.<br />Whether to allow merging data of multiple checkpoints into one
physical file. If this option is set to false, only merge files within
checkpoint boundaries will be merged. Otherwise, it is possible for the logical
files of different checkpoints to share the same physical file.</td>
+ </tr>
+ <tr>
+ <td><h5>state.checkpoints.file-merging.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable merging multiple checkpoint files into one,
which will greatly reduce the number of small checkpoint files.</td>
Review Comment:
Maybe add some description: This is an experimental feature under evaluation.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##########
@@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() {
* org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
*/
public @Nullable FileMergingSnapshotManager
fileMergingSnapshotManagerForJob(
- @Nonnull JobID jobId) {
+ @Nonnull JobID jobId,
+ Configuration clusterConfiguration,
+ Configuration jobConfiguration) {
+ boolean mergingEnabled =
+ jobConfiguration
+ .getOptional(FILE_MERGING_ENABLED)
+
.orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED));
synchronized (lock) {
if (closed) {
throw new IllegalStateException(
"TaskExecutorFileMergingManager is already closed and
cannot "
+ "register a new
FileMergingSnapshotManager.");
}
+ if (!mergingEnabled) {
+ return null;
+ }
FileMergingSnapshotManager fileMergingSnapshotManager =
fileMergingSnapshotManagerByJobId.get(jobId);
- if (fileMergingSnapshotManager == null) {
- // TODO FLINK-32440: choose different
FileMergingSnapshotManager by configuration
+ if (fileMergingSnapshotManager == null && mergingEnabled) {
+ FileMergingType fileMergingType =
+ jobConfiguration
+
.getOptional(FILE_MERGING_ACROSS_BOUNDARY)
+ .orElse(
+
clusterConfiguration.getBoolean(
Review Comment:
Same as above
##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
<td>String</td>
<td>The default directory used for storing the data files and meta
data of checkpoints in a Flink supported filesystem. The storage path must be
accessible from all participating processes/nodes(i.e. all TaskManagers and
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only
the meta data of checkpoints will be stored in this directory.</td>
</tr>
+ <tr>
+
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Only relevant if <code
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is
enabled.<br />Whether to allow merging data of multiple checkpoints into one
physical file. If this option is set to false, only merge files within
checkpoint boundaries will be merged. Otherwise, it is possible for the logical
files of different checkpoints to share the same physical file.</td>
+ </tr>
+ <tr>
+ <td><h5>state.checkpoints.file-merging.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable merging multiple checkpoint files into one,
which will greatly reduce the number of small checkpoint files.</td>
+ </tr>
+ <tr>
+ <td><h5>state.checkpoints.file-merging.max-file-size</h5></td>
+ <td style="word-wrap: break-word;">32 mb</td>
+ <td>MemorySize</td>
+ <td>Max size of a physical file for merged checkpoints.</td>
+ </tr>
+ <tr>
+
<td><h5>state.checkpoints.file-merging.max-space-amplification</h5></td>
+ <td style="word-wrap: break-word;">0.75</td>
Review Comment:
And this should be some value bigger than 1
##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -331,4 +332,103 @@ public class CheckpointingOptions {
+ StateRecoveryOptions.LOCAL_RECOVERY.key()
+ ". By default, local backup is
deactivated. Local backup currently only "
+ "covers keyed state backends (including
both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).");
+
+ // ------------------------------------------------------------------------
+ // Options related to file merging
+ // ------------------------------------------------------------------------
+
+ /**
+ * Whether to enable merging multiple checkpoint files into one, which
will greatly reduce the
+ * number of small checkpoint files. See FLIP-306 for details.
+ */
+ @Experimental
+ @Documentation.Section(Documentation.Sections.FILE_MERGING)
+ public static final ConfigOption<Boolean> FILE_MERGING_ENABLED =
+ ConfigOptions.key("state.checkpoints.file-merging.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable merging multiple checkpoint
files into one, which will greatly reduce"
+ + " the number of small checkpoint
files.");
+
+ /**
+ * Whether to allow merging data of multiple checkpoints into one physical
file. If this option
+ * is set to false, only files within checkpoint boundaries will be
merged. Otherwise, it is
+ * possible for the logical files of different checkpoints to share the
same physical file.
+ */
+ @Experimental
+ @Documentation.Section(Documentation.Sections.FILE_MERGING)
+ public static final ConfigOption<Boolean> FILE_MERGING_ACROSS_BOUNDARY =
+
ConfigOptions.key("state.checkpoints.file-merging.across-checkpoint-boundary")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Only relevant if %s is enabled.",
+
TextElement.code(FILE_MERGING_ENABLED.key()))
+ .linebreak()
+ .text(
+ "Whether to allow merging data of
multiple checkpoints into one physical file. "
+ + "If this option is set
to false, "
+ + "only merge files within
checkpoint boundaries will be merged. "
+ + "Otherwise, it is
possible for the logical files of different "
+ + "checkpoints to share
the same physical file.")
+ .build());
+
+ /** The max size of a physical file for merged checkpoints. */
+ @Experimental
+ @Documentation.Section(Documentation.Sections.FILE_MERGING)
+ public static final ConfigOption<MemorySize> FILE_MERGING_MAX_FILE_SIZE =
+ ConfigOptions.key("state.checkpoints.file-merging.max-file-size")
+ .memoryType()
+ .defaultValue(MemorySize.parse("32MB"))
+ .withDescription("Max size of a physical file for merged
checkpoints.");
+
+ /**
+ * Whether to use Blocking or Non-Blocking pool for merging physical
files. A Non-Blocking pool
+ * will always provide usable physical file without blocking. It may
create many physical files
+ * if poll file frequently. When poll a small file from a Blocking pool,
it may be blocked until
+ * the file is returned.
+ */
+ @Experimental
+ @Documentation.Section(Documentation.Sections.FILE_MERGING)
+ public static final ConfigOption<Boolean> FILE_MERGING_POOL_BLOCKING =
+ ConfigOptions.key("state.checkpoints.file-merging.pool-blocking")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to use Blocking or Non-Blocking pool for
merging physical files. "
+ + "A Non-Blocking pool will always provide
usable physical file without blocking. It may create many physical files if
poll file frequently. "
+ + "When poll a small file from a Blocking
pool, it may be blocked until the file is returned.");
+
+ /**
+ * The upper limit of the file pool size based on the number of subtasks
within each TM (only
+ * for merging private state at Task Manager level).
+ */
+ @Experimental
+ @Documentation.Section(Documentation.Sections.FILE_MERGING)
+ public static final ConfigOption<Integer>
FILE_MERGING_MAX_SUBTASKS_PER_FILE =
+
ConfigOptions.key("state.checkpoints.file-merging.max-subtasks-per-file")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "The upper limit of the file pool size based on
the number of subtasks within each TM"
+ + "(only for merging private state at Task
Manager level).");
+
+ /**
+ * A threshold that triggers a compaction (re-uploading) of one physical
file. If the amount of
+ * invalid data in a physical file exceeds the threshold, a new physical
file will be created
+ * and uploaded.
+ */
+ @Experimental
+ @Documentation.Section(Documentation.Sections.FILE_MERGING)
+ public static final ConfigOption<Float>
FILE_MERGING_MAX_SPACE_AMPLIFICATION =
Review Comment:
I'd suggest annotate `FILE_MERGING_MAX_SPACE_AMPLIFICATION`,
`FILE_MERGING_MAX_SUBTASKS_PER_FILE` and `FILE_MERGING_POOL_BLOCKING` with
`@Documentation.ExcludeFromDocumentation`, since these are to be implemented or
user are not able to specify currently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]