kbendick commented on a change in pull request #1515:
URL: https://github.com/apache/iceberg/pull/1515#discussion_r495626186
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -57,6 +57,9 @@
private static final String ICEBERG_STREAM_WRITER_NAME =
IcebergStreamWriter.class.getSimpleName();
private static final String ICEBERG_FILES_COMMITTER_NAME =
IcebergFilesCommitter.class.getSimpleName();
+ public static final String FLINK_ICEBERG_SINK_FLUSHINTERVAL =
"flink.iceberg.sink.flushinterval";
Review comment:
Hi @simonsssu. Thanks for the quick response.
To me, the new name suggested is potentially more confusing, as it
introduces the term `commit interval` into the configuration for the
`IcebergFileCommitter`, but it only covers a small (arguably edge) case for
jobs that do not checkpoint. I think, for many Flink users, seeing the
configuration `flink.iceberg.sink.commit.interval` will still very much lead
them to believe that this is the main parameter to tune regardless of
checkpoints. But my understanding is that we rely on:
1. Checkpoints to know when to commit.
2. This configuration parameter to commit when checkpointing is not enabled.
In Flink, when a checkpoint happens, there's possibly several commits,
typically the last one being committing the id of the last checkpoint to
zookeeper in a zookeeper based HA set up with checkpointing enabled, which is
by far the most common setup amongst my Flink users (I have one user with only
one job that does not checkpoint).
To show an example of where `commit` is already a preexisting term with ties
to checkpoints in Flink, under the documentation for [High Availability in the
Flink Configuration
Docs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#high-availability)
the following sentence also uses commit, to refer to when the jobmanager
commits metadata to zookeeper.
```
For the JobManager itself to recover consistently, an external service must
store a minimal
amount of recovery metadata (like “ID of last committed checkpoint”)
```
I'd still really prefer to see something in the configuration name that
indicates this configuration interval is _only when not checkpointing_. I worry
/ have observed that people frequently don't read the docs as much as they
should and then encounter issues. However, I do recognize that we probably
don't want to make the configuration names really verbose.
What about `flink.iceberg.non-checkpointing-job.sink-flush-interval` or to
be more inline with flink's naming pattern for configurations,
`flink.iceberg.non.checkpointing.job.sink-commit-interval`? This is more inline
with flink's own cluster level parameter to control the checkpoint interval,
[execution.checkpointing.interval](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval).
Additionally the usage of interval here corresponds nicely to the Flink
configuration parameter's description for controlling the maximum interval at
which network buffers should be flushed,
[execution.buffer-timeout](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-buffer-timeout).
So possibly a name that mirrors the `execution.buffer-timeout`, which controls
the maximum time frequency for the flushing of (task to task / shuffling)
network buffers.
Another name could be
`flink.iceberg.non-checkpointing-sink-buffer-flush-interval`.
I think of all of the options I mentioned above, the one that I am most
leaning towards that I think most clearly explains the paremeter is
`flink.iceberg.non.checkpointing.job.sink-flush-interval` or
`flink.iceberg.sink-flush-interval-for-non-checkpointing-jobs`. I think the
second is the most clear as it has `sink` right after `iceberg` and has
`for-non-checkpointing-jobs`. I'd still like to see it documented in the code
via a comment and even in the Flink docs on the website.
What do you think of these names @simonsssu? I'd love to hear any
suggestions for alternative config names, especially ones that emphasize that
this is for non-checkpointing jobs, though I will admit that I think
`flink.iceberg.sink.commit.interval` adds further confusion. If for example, I
were a new dev in an organization that used flink, and I was writing my first
job and copy pasting from another job to create a skeleton, if I saw
`flink.iceberg.sink.commit.interval` I would assume that this was the one and
only parameter that controlled when files are committed. But as I understand
it, files are committed on job checkpoint and then we'll fall back to this
parameter if the job doesn't checkpoint.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]