kbendick commented on a change in pull request #1515:
URL: https://github.com/apache/iceberg/pull/1515#discussion_r495636683
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -57,6 +57,9 @@
private static final String ICEBERG_STREAM_WRITER_NAME =
IcebergStreamWriter.class.getSimpleName();
private static final String ICEBERG_FILES_COMMITTER_NAME =
IcebergFilesCommitter.class.getSimpleName();
+ public static final String FLINK_ICEBERG_SINK_FLUSHINTERVAL =
"flink.iceberg.sink.flushinterval";
Review comment:
Oh and I have another idea to help ensure that developers follow the
happy path when configuring their jobs.
We know in the `FlinkIcebergCommitter#open` method whether or not this job
is checkpointing. We set the parameter (whatever it winds up being called) in
the operators constructor.
After checking for `isCheckpointingEnabled`, could we throw an exception if
a value is set for the new parameter? It would require to not eagerly set the
default value in the constructor, but instead set it lazily in `open` as well.
This is beneficial given that the usage of `FlinkConfiguration.getLong(String
key, long defaultValue)` is deprecated anyway.
Admittedly, my proposed idea still relies on the deprecated method until
such a time that we can define our own `ConfigOption` and then use one of the
two recommended replacement methods:
- `public <T> Optional<T> getOptional(ConfigOption<T> option)`
- `public long getLong(ConfigOption<Long> configOption, long
overrideDefault)`
```java
@Override
public void open() throws Exception {
super.open();
final boolean isCheckpointEnabled =
getRuntimeContext().isCheckpointingEnabled();
final boolean isCheckpointDisabled = !isCheckpointEnabled;
// It is considered an error for users to set
FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL
// when checkpointing is enabled, as they likely intended for data
flushing to occur at
// the FLINK_ICEBERG_SINK_FLUSHINTERVAL, but it will instead occur
according to the
// configured checkpointing interval.
//
// Note (that shouldn't go in the checked in code) that I'm using the
existing config constant name,
// though I'd still love to see it updated and then update all
references to the new name!
//
// Please import org.apache.iceberg.exceptions.ValidationException and
then use ValidationException.check
// I used the fully qualified class name to show you where this can be
found (please also do not include this
// comment in the code).
org.apache.iceberg.exceptions.ValidationException.check(
isCheckpointEnabled &&
this.flinkConf.contains(FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL),
"Cannot set %s when checkpointing is enabled. It is unlikely this is
what is intended. Data will be buffered until the next checkpoint, and then
flushed and committed. To control the rate of flushing iceberg , change the
checkpointing interval.",
FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL
);
// If we don't enable checkpoint, we will use processingTimeSerice to do
commit,
if (isCheckpointDisabled) {
this.flushCommitInterval = this.flinkConf.getLong(
FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL,
FlinkSink.DEFAULT_FLINK_ICEBERG_SINK_FLUSHINTERVAL);
ProcessingTimeService processingTimeService =
getRuntimeContext().getProcessingTimeService();
final long currentTimestamp =
processingTimeService.getCurrentProcessingTime();
processingTimeService.registerTimer(currentTimestamp +
flushCommitInterval, this);
// Log to the user what value in processing time will be used for
flush / commit, and where it was
// derived from, as most likely users have just forgotten to enable
checkpointing.
final String logIntervalFmtStr =
"Checkpointing is disabled. The IcebergFilesCommitter will flush
data files and commit every %d millis as determined by %s.";
final String commitIntervalSource =
this.flinkConf.contains(FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL)
?
FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL
:
FlinkSink.DEFAULT_FLINK_ICEBERG_SINK_FLUSHINTERVAL;
LOG.info(logIntervalFmtStr, this.flushCommitInterval,
commitIntervalSource);
}
```
I think that we should treat jobs that have checkpointing enabled with a
specified flink iceberg sink flush / commit interval (whatever we wind up
naming it) as an error. If such a time comes that we allow users to specify the
commit interval for the iceberg sink independent of the checkpointing interval,
then we can stop treating it as an error
I imagine for most users, winding up in a situation where they're not
checkpointing in a very stateful job and implicitly using the default flush
interval is probably not what they intended and indicates a bug on their end -
e.g. they might not know that checkpointing is not enabled (as it's pretty rare
to disable it in production for jobs that depend on state so much). Thus, I
think the log will help developers determine if they need to make changes to
their configuration.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]