[
https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780356#comment-16780356
]
Dawid Wysakowicz commented on FLINK-11335:
------------------------------------------
Are you sure that *no* offsets are committed, isn't it just that *some* offsets
are uncommitted for the checkpoints that either failed or were subsumed.
Also bear in mind that you cannot assume any exactly_once guarantees based on
offsets committed to kafka. In order to guarantee delivery guarantees Flink
keeps kafka offsets in snapshots. So only if you start/restart your job with
savepoint/checkpoint you may assume that a single message won't be accounted in
Flink's state twice. I would generally say that you may use offsets committed
to back to Kafka for monitoring purposes. I would recommend you reading the
Flink Kafka consumer documentation, especially the section about [consumer
fault
tolerance|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance].
Remember that it might happen that a message is read twice from Kafka. If you
cancel job after some messages were read from Kafka after last checkpoint
completed but before the next one did so. That does not mean though that it
will be accounted twice in any computation or commited twice if you use any
sink that provides exactly-once guarantees.
> Kafka consumer can not commit offset at checkpoint
> --------------------------------------------------
>
> Key: FLINK-11335
> URL: https://issues.apache.org/jira/browse/FLINK-11335
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.6.2
> Environment: AWS EMR 5.20: hadoop, flink plugin
> flink: 1.62
> run under yarn-cluster
> Kafka cluster: 1.0
>
> Reporter: andy hoang
> Priority: Critical
>
> When trying to commit offset to kafka, I always get warning
> {noformat}
> 2019-01-15 11:18:55,405 WARN
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher -
> Committing offsets to Kafka takes longer than the checkpoint interval.
> Skipping commit of previous offsets because newer complete checkpoint offsets
> are available. This does not compromise Flink's checkpoint integrity.
> {noformat}
> The result is not commiting any message to kafka
> The code was simplified be remove business
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
> env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> val properties = new Properties()
> properties.setProperty("group.id", "my_groupid")
> //properties.setProperty("enable.auto.commit", "false")
> val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
> new JSONKeyValueDeserializationSchema(true),
>
> properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
> val stream = env.addSource(consumer)
>
> stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode),
> (Int, ujson.Value)]] {
> override def map(node:ObjectNode): scala.Either[(Exception,
> ObjectNode), (Int, ujson.Value)] = {
> logger.info("##################
> %s".format(node.get("metadata").toString))
> Thread.sleep(3000)
> return Right(200, writeJs(node.toString))
> }
> }).print()
> env.execute("pp_convoy_flink")
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)