simonsssu commented on a change in pull request #1515:
URL: https://github.com/apache/iceberg/pull/1515#discussion_r496432799



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -57,6 +57,9 @@
   private static final String ICEBERG_STREAM_WRITER_NAME = 
IcebergStreamWriter.class.getSimpleName();
   private static final String ICEBERG_FILES_COMMITTER_NAME = 
IcebergFilesCommitter.class.getSimpleName();
 
+  public static final String FLINK_ICEBERG_SINK_FLUSHINTERVAL = 
"flink.iceberg.sink.flushinterval";

Review comment:
       > Hi @simonsssu. Thanks for the quick response.
   > 
   > To me, the new name suggested is potentially more confusing, as it 
introduces the term `commit interval` into the configuration for the 
`IcebergFileCommitter`, but it only covers a small (arguably edge) case for 
jobs that do not checkpoint. I think, for many Flink users, seeing the 
configuration `flink.iceberg.sink.commit.interval` will still very much lead 
them to believe that this is the main parameter to tune regardless of 
checkpoints. But my understanding is that we rely on:
   > 
   > 1. Checkpoints to know when to commit.
   > 2. This configuration parameter to commit when checkpointing is not 
enabled.
   > 
   > In Flink, when a checkpoint happens, there's possibly several commits, 
typically the last one being committing the id of the last checkpoint to 
zookeeper in a zookeeper based HA set up with checkpointing enabled, which is 
by far the most common setup amongst my Flink users (I have one user with only 
one job that does not checkpoint).
   > 
   > To show an example of where `commit` is already a preexisting term with 
ties to checkpoints in Flink, under the documentation for [High Availability in 
the Flink Configuration 
Docs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#high-availability)
 the following sentence also uses commit, to refer to when the jobmanager 
commits metadata to zookeeper.
   > 
   > ```
   > For the JobManager itself to recover consistently, an external service 
must store a minimal 
   > amount of recovery metadata (like “ID of last committed checkpoint”)
   > ```
   > 
   > I'd still really prefer to see something in the configuration name that 
indicates this configuration interval is _only when not checkpointing_. I worry 
/ have observed that people frequently don't read the docs as much as they 
should and then encounter issues. However, I do recognize that we probably 
don't want to make the configuration names really verbose.
   > 
   > What about `flink.iceberg.non-checkpointing-job.sink-flush-interval` or to 
be more inline with flink's naming pattern for configurations, 
`flink.iceberg.non.checkpointing.job.sink-commit-interval`? This is more inline 
with flink's own cluster level parameter to control the checkpoint interval, 
[execution.checkpointing.interval](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval).
 Additionally the usage of interval here corresponds nicely to the Flink 
configuration parameter's description for controlling the maximum interval at 
which network buffers should be flushed, 
[execution.buffer-timeout](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-buffer-timeout).
 So possibly a name that mirrors the `execution.buffer-timeout`, which controls 
the maximum time frequency for the flushing of (task to task / shuffling) 
network buffers.
   > 
   > Another name could be 
`flink.iceberg.non-checkpointing-sink-buffer-flush-interval`.
   > 
   > I think of all of the options I mentioned above, the one that I am most 
leaning towards that I think most clearly explains the paremeter is 
`flink.iceberg.non.checkpointing.job.sink-flush-interval` or 
`flink.iceberg.sink-flush-interval-for-non-checkpointing-jobs`. I think the 
second is the most clear as it has `sink` right after `iceberg` and has 
`for-non-checkpointing-jobs`. I'd still like to see it documented in the code 
via a comment and even in the Flink docs on the website.
   > 
   > What do you think of these names @simonsssu? I'd love to hear any 
suggestions for alternative config names, especially ones that emphasize that 
this is for non-checkpointing jobs, though I will admit that I think 
`flink.iceberg.sink.commit.interval` adds further confusion. If for example, I 
were a new dev in an organization that used flink, and I was writing my first 
job and copy pasting from another job to create a skeleton, if I saw 
`flink.iceberg.sink.commit.interval` I would assume that this was the one and 
only parameter that controlled when files are committed. But as I understand 
it, files are committed on job checkpoint and then we'll fall back to this 
parameter if the job doesn't checkpoint.
   
   Hi Kyle,  Apparently,  Word by word configuration name will not make 
confused. But I think it's not elegant if we defined a too long configuration 
name. So I think we can use  " 
flink.iceberg.sink.non-checkpoint-commit-interval" and add documents to tell 
users about the details. Just add non-checkpoint in my previous defined.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to