[
https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720807#comment-17720807
]
Pritam Agarwala commented on FLINK-32038:
-----------------------------------------
[~martijnvisser] I have already gone through it. What I requested here to
change this default behaviour : If checkpointing is enabled, then the
auto-commit setting is ignored, but the offsets will be committed after
checkpointing only.
> OffsetCommitMode.Kafka_periodic with checkpointing enabled
> -----------------------------------------------------------
>
> Key: FLINK-32038
> URL: https://issues.apache.org/jira/browse/FLINK-32038
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka, Runtime / Checkpointing
> Affects Versions: 1.14.6
> Reporter: Pritam Agarwala
> Priority: Major
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka
> committed offset. Flink is updating the offsets only after checkpointing to
> make it consistent.
> Default Behaviour as per doc :
> If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set
> to false, then offset will not be committed at all even if the
> {{enable.auto.commit}} is set to true.
> So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't
> it fall back on the {{enable.auto.commit}} to do offset commit regularly
> since* *in any case flink doesn't use consumer committed offsets for
> recovery.*
>
> OffsetCommitModes class :
>
> {code:java}
> public class OffsetCommitModes {
> /**
> * Determine the offset commit mode using several configuration values.
> *
> * @param enableAutoCommit whether or not auto committing is enabled in
> the provided Kafka
> * properties.
> * @param enableCommitOnCheckpoint whether or not committing on
> checkpoints is enabled.
> * @param enableCheckpointing whether or not checkpoint is enabled for
> the consumer.
> * @return the offset commit mode to use, based on the configuration
> values.
> */
> public static OffsetCommitMode fromConfiguration(
> boolean enableAutoCommit,
> boolean enableCommitOnCheckpoint,
> boolean enableCheckpointing) {
> if (enableCheckpointing) {
> // if checkpointing is enabled, the mode depends only on whether
> committing on
> // checkpoints is enabled
> return (enableCommitOnCheckpoint)
> ? OffsetCommitMode.ON_CHECKPOINTS
> : OffsetCommitMode.DISABLED;
> } else {
> // else, the mode depends only on whether auto committing is
> enabled in the provided
> // Kafka properties
> return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC :
> OffsetCommitMode.DISABLED;
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)