kbendick commented on a change in pull request #1515:
URL: https://github.com/apache/iceberg/pull/1515#discussion_r495636683



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -57,6 +57,9 @@
   private static final String ICEBERG_STREAM_WRITER_NAME = 
IcebergStreamWriter.class.getSimpleName();
   private static final String ICEBERG_FILES_COMMITTER_NAME = 
IcebergFilesCommitter.class.getSimpleName();
 
+  public static final String FLINK_ICEBERG_SINK_FLUSHINTERVAL = 
"flink.iceberg.sink.flushinterval";

Review comment:
       Oh and I have another idea to help ensure that developers follow the 
happy path when configuring their jobs.
   
   We know in the `FlinkIcebergCommitter#open` method whether or not this job 
is checkpointing. We set the parameter (whatever it winds up being called) in 
the operators constructor.
   
   After checking for `isCheckpointingEnabled`, could we throw an exception if 
a value is set for the new parameter? It would require to not eagerly set the 
default value in the constructor, but instead set it lazily in `open` as well. 
This is beneficial given that the usage of `FlinkConfiguration.getLong(String 
key, long defaultValue)` is deprecated anyway.
   
   Admittedly, my proposed idea still relies on the deprecated method until 
such a time that we can define our own `ConfigOption` and then use one of the 
two recommended replacement methods: 
   
   -  `public <T> Optional<T> getOptional(ConfigOption<T> option)`
   - `public long getLong(ConfigOption<Long> configOption, long 
overrideDefault)`
   
   ```java
     @Override
     public void open() throws Exception {
       super.open();
       final boolean isCheckpointEnabled = 
getRuntimeContext().isCheckpointingEnabled();
       final boolean isCheckpointDisabled = !isCheckpointEnabled;
       // It is considered an error for users to set 
FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL
       // when checkpointing is enabled, as they likely intended for data 
flushing to occur at
       // the FLINK_ICEBERG_SINK_FLUSHINTERVAL, but it will instead occur 
according to the
       // configured checkpointing interval.
       //
       // Note (that shouldn't go in the checked in code) that I'm using the 
existing config constant name,
       // though I'd still love to see it updated and then update all 
references to the new name!
       //
       // Please import org.apache.iceberg.exceptions.ValidationException and 
then use ValidationException.check
       // I used the fully qualified class name to show you where this can be 
found (please also do not include this
       // comment in the code).
       org.apache.iceberg.exceptions.ValidationException.check(
           isCheckpointEnabled && 
this.flinkConf.contains(FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL),
           "Cannot set %s when checkpointing is enabled. It is unlikely this is 
what is intended. Data will be buffered until the next checkpoint, and then 
flushed and committed. To control the rate of flushing iceberg , change the 
checkpointing interval.",
           FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL
       );
       // If we don't enable checkpoint, we will use processingTimeSerice to do 
commit,
       if (isCheckpointDisabled) {
         this.flushCommitInterval = this.flinkConf.getLong(
                                                         
FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL,
                                                         
FlinkSink.DEFAULT_FLINK_ICEBERG_SINK_FLUSHINTERVAL);
         ProcessingTimeService processingTimeService = 
getRuntimeContext().getProcessingTimeService();
         final long currentTimestamp = 
processingTimeService.getCurrentProcessingTime();
         processingTimeService.registerTimer(currentTimestamp + 
flushCommitInterval, this);
   
         // Log to the user what value in processing time will be used for 
flush / commit, and where it was
         // derived from, as most likely users have just forgotten to enable 
checkpointing.
         final String logIntervalFmtStr = 
           "Checkpointing is disabled. The IcebergFilesCommitter will flush 
data files and commit every %d millis as determined by %s.";
         final String commitIntervalSource = 
this.flinkConf.contains(FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL)
                                                                        ? 
FlinkSink.FLINK_ICEBERG_SINK_FLUSHINTERVAL
                                                                        : 
FlinkSink.DEFAULT_FLINK_ICEBERG_SINK_FLUSHINTERVAL;
           LOG.info(logIntervalFmtStr, this.flushCommitInterval, 
commitIntervalSource);
       }
   ```
   
   I think that we should treat jobs that have checkpointing enabled with a 
specified flink iceberg sink flush / commit interval (whatever we wind up 
naming it) as an error. If such a time comes that we allow users to specify the 
commit interval for the iceberg sink independent of the checkpointing interval, 
then we can stop treating it as an error
   
   I imagine for most users, winding up in a situation where they're not 
checkpointing in a very stateful job and implicitly using the default flush 
interval is probably not what they intended and indicates a bug on their end - 
e.g. they might not know that checkpointing is not enabled (as it's pretty rare 
to disable it in production for jobs that depend on state so much). Thus, I 
think the log will help developers determine if they need to make changes to 
their configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to