[ 
https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16781469#comment-16781469
 ] 

andy hoang commented on FLINK-11335:
------------------------------------

[~dawidwys], yep, I sented to mailing list. Thanks for your help. About the 
reproducable example, does it need to include the jar file with local env 
(kafka env) so that anyone has current local cluster and local kafka server can 
use it? Do I need to include the script to pump msg into kafka (so that flink 
app can read on?)

In the description, I included all the code that is in main file.

> Kafka consumer can not commit offset at checkpoint
> --------------------------------------------------
>
>                 Key: FLINK-11335
>                 URL: https://issues.apache.org/jira/browse/FLINK-11335
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.2
>         Environment: AWS EMR 5.20: hadoop, flink plugin
> flink: 1.62
> run under yarn-cluster
> Kafka cluster: 1.0
>  
>            Reporter: andy hoang
>            Priority: Critical
>         Attachments: repeated.log
>
>
> When trying to commit offset to kafka, I always get warning
> {noformat}
> 2019-01-15 11:18:55,405 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka takes longer than the checkpoint interval. 
> Skipping commit of previous offsets because newer complete checkpoint offsets 
> are available. This does not compromise Flink's checkpoint integrity.
> {noformat}
> The result is not commiting any message to kafka
> The code was simplified be remove business
> {code:java}
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
>     env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>     env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>     env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>     val properties = new Properties()
>     properties.setProperty("group.id", "my_groupid")
>     //properties.setProperty("enable.auto.commit", "false")
>     val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
>       new JSONKeyValueDeserializationSchema(true),
>       
> properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
>     val stream = env.addSource(consumer)
>     
>     stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), 
> (Int, ujson.Value)]] {
>       override def map(node:ObjectNode): scala.Either[(Exception, 
> ObjectNode), (Int, ujson.Value)] = {
>           logger.info("################## 
> %s".format(node.get("metadata").toString))
>           Thread.sleep(3000)
>           return Right(200, writeJs(node.toString))
>       }
>     }).print()
>     env.execute("pp_convoy_flink")
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to