[ 
https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780356#comment-16780356
 ] 

Dawid Wysakowicz commented on FLINK-11335:
------------------------------------------

Are you sure that *no* offsets are committed, isn't it just that *some* offsets 
are uncommitted for the checkpoints that either failed or were subsumed.

Also bear in mind that you cannot assume any exactly_once guarantees based on 
offsets committed to kafka. In order to guarantee delivery guarantees Flink 
keeps kafka offsets in snapshots. So only if you start/restart your job with 
savepoint/checkpoint you may assume that a single message won't be accounted in 
Flink's state twice. I would generally say that you may use offsets committed 
to back to Kafka for monitoring purposes. I would recommend you reading the 
Flink Kafka consumer documentation, especially the section about [consumer 
fault 
tolerance|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance].

Remember that it might happen that a message is read twice from Kafka. If you 
cancel job after some messages were read from Kafka after last checkpoint 
completed but before the next one did so. That does not mean though that it 
will be accounted twice in any computation or commited twice if you use any 
sink that provides exactly-once guarantees.

> Kafka consumer can not commit offset at checkpoint
> --------------------------------------------------
>
>                 Key: FLINK-11335
>                 URL: https://issues.apache.org/jira/browse/FLINK-11335
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.2
>         Environment: AWS EMR 5.20: hadoop, flink plugin
> flink: 1.62
> run under yarn-cluster
> Kafka cluster: 1.0
>  
>            Reporter: andy hoang
>            Priority: Critical
>
> When trying to commit offset to kafka, I always get warning
> {noformat}
> 2019-01-15 11:18:55,405 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka takes longer than the checkpoint interval. 
> Skipping commit of previous offsets because newer complete checkpoint offsets 
> are available. This does not compromise Flink's checkpoint integrity.
> {noformat}
> The result is not commiting any message to kafka
> The code was simplified be remove business
> {code:java}
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
>     env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>     env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>     env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>     val properties = new Properties()
>     properties.setProperty("group.id", "my_groupid")
>     //properties.setProperty("enable.auto.commit", "false")
>     val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
>       new JSONKeyValueDeserializationSchema(true),
>       
> properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
>     val stream = env.addSource(consumer)
>     
>     stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), 
> (Int, ujson.Value)]] {
>       override def map(node:ObjectNode): scala.Either[(Exception, 
> ObjectNode), (Int, ujson.Value)] = {
>           logger.info("################## 
> %s".format(node.get("metadata").toString))
>           Thread.sleep(3000)
>           return Right(200, writeJs(node.toString))
>       }
>     }).print()
>     env.execute("pp_convoy_flink")
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to