[
https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780375#comment-16780375
]
andy hoang commented on FLINK-11335:
------------------------------------
[^repeated.log]
[~dawidwys] I'm pretty sure no offset is commited. The log file above was
specified only in partition 19, as you see, between 2 application ,
container_1551327455160_0002_01_000002 and
container_1551327455160_0002_01_000003, the app was reading the same message
again.
I understand that, there're no guarantees the msg in only processed one time,
and by starting a job from savepoint we can make sure it would processed one
time. But from this particular case, I would love to have this testcase only
make sure that when checkpoint happened, it can commit kafka offset back to
kafka.
I'm digging every example code I could to make sure that I didn't do anything
wrong, but there're really lacking of sample that is using real stateBackend
such as aws S3
Here how i deploy the script on AWS EMR
{code:java}
flink run -m yarn-cluster -yid application_1551327455160_0002
pp_flink_convoy-1.1.jar{code}
wait a few minutes
{code:java}
flink cancel 6b109a9f7c5d3cab40a31118a128c1e6 -yid
application_1551327455160_0002 # 6b109a9f7c5d3cab40a31118a128c1e6 is the task
id{code}
run it again
{code:java}
flink run -m yarn-cluster -yid application_1551327455160_0002
pp_flink_convoy-1.1.jar{code}
> Kafka consumer can not commit offset at checkpoint
> --------------------------------------------------
>
> Key: FLINK-11335
> URL: https://issues.apache.org/jira/browse/FLINK-11335
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.6.2
> Environment: AWS EMR 5.20: hadoop, flink plugin
> flink: 1.62
> run under yarn-cluster
> Kafka cluster: 1.0
>
> Reporter: andy hoang
> Priority: Critical
> Attachments: repeated.log
>
>
> When trying to commit offset to kafka, I always get warning
> {noformat}
> 2019-01-15 11:18:55,405 WARN
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher -
> Committing offsets to Kafka takes longer than the checkpoint interval.
> Skipping commit of previous offsets because newer complete checkpoint offsets
> are available. This does not compromise Flink's checkpoint integrity.
> {noformat}
> The result is not commiting any message to kafka
> The code was simplified be remove business
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
> env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> val properties = new Properties()
> properties.setProperty("group.id", "my_groupid")
> //properties.setProperty("enable.auto.commit", "false")
> val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
> new JSONKeyValueDeserializationSchema(true),
>
> properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
> val stream = env.addSource(consumer)
>
> stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode),
> (Int, ujson.Value)]] {
> override def map(node:ObjectNode): scala.Either[(Exception,
> ObjectNode), (Int, ujson.Value)] = {
> logger.info("##################
> %s".format(node.get("metadata").toString))
> Thread.sleep(3000)
> return Right(200, writeJs(node.toString))
> }
> }).print()
> env.execute("pp_convoy_flink")
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)