[
https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780105#comment-16780105
]
andy hoang commented on FLINK-11335:
------------------------------------
[~dawidwys]Thanks for reply, I dont see any async commit failed message, whole
list of log will look like below, I also update to flink 1.7.0 and kafka
consumer to latest, still got the same problem.
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>
```
The warning about `MultipartUploadOutputStream` is quite ambigous
All log here:
Right after deploy the app:
```==> container_1551327455160_0002_01_000001/jobmanager.log <==
2019-02-28 04:42:34,492 INFO org.apache.flink.yarn.YarnResourceManager
- Registering TaskManager with ResourceID
container_1551327455160_0002_01_000002
(akka.tcp://[email protected]:44931/user/taskmanager_0)
at ResourceManager
2019-02-28 04:42:34,693 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source -> Map -> Sink: Print to Std. Out (1/1)
(850716ec4421c9e70852a0eba5975f01) switched from SCHEDULED to DEPLOYING.
2019-02-28 04:42:34,694 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying
Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (attempt #0) to
ip-10-16-1-215
2019-02-28 04:42:35,247 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source -> Map -> Sink: Print to Std. Out (1/1)
(850716ec4421c9e70852a0eba5975f01) switched from DEPLOYING to RUNNING.
2019-02-28 04:42:35,592 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1 @ 1551328955432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,662 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
checkpoint 1 by task 850716ec4421c9e70852a0eba5975f01 of job
40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,663 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 1 of job 40dd9e3dba228623226fcf3bda0d1c0a.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
Task Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) was not
running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-02-28 04:42:41,484 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 2 @ 1551328961432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:46,555 INFO
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close
closed:false
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-2/_metadata
2019-02-28 04:42:46,588 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 2 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5153 ms).
```
Then a lot of log like this after running for a while:
```
==>
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
<==
2019-02-28 04:46:19,327 WARN
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Committing
offsets to Kafka takes longer than the checkpoint interval. Skipping commit of
previous offsets because newer complete checkpoint offsets are available. This
does not compromise Flink's checkpoint integrity.
2019-02-28 04:46:19,329 INFO activity
- ##################
{"offset":1037251,"topic":"my_topic","partition":11} <====== my log to check
offset if it is reprocessed
==>
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
<==
2019-02-28 04:46:20,134 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 33 @ 1551329180078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
==>
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
<==
2019-02-28 04:46:22,330 INFO activity
- ##################
{"offset":1037252,"topic":"my_topic","partition":11}
==>
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.out
<==
Right((200,"MESSAGEEEEEE DETAIL"))
==>
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
<==
2019-02-28 04:46:25,335 INFO activity
- ##################
{"offset":1037253,"topic":"my_topic","partition":11}
2019-02-28 04:46:25,394 INFO
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close
closed:false
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/70597bc7-728b-4c86-9488-2439f562fc98
==>
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
<==
2019-02-28 04:46:25,443 INFO
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close
closed:false
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/_metadata
2019-02-28 04:46:25,549 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 33 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5397 ms).
2019-02-28 04:46:26,132 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 34 @ 1551329186078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
```
> Kafka consumer can not commit offset at checkpoint
> --------------------------------------------------
>
> Key: FLINK-11335
> URL: https://issues.apache.org/jira/browse/FLINK-11335
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.6.2
> Environment: AWS EMR 5.20: hadoop, flink plugin
> flink: 1.62
> run under yarn-cluster
> Kafka cluster: 1.0
>
> Reporter: andy hoang
> Priority: Critical
>
> When trying to commit offset to kafka, I always get warning
> {noformat}
> 2019-01-15 11:18:55,405 WARN
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher -
> Committing offsets to Kafka takes longer than the checkpoint interval.
> Skipping commit of previous offsets because newer complete checkpoint offsets
> are available. This does not compromise Flink's checkpoint integrity.
> {noformat}
> The result is not commiting any message to kafka
> The code was simplified be remove business
> {code:java}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
> env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> val properties = new Properties()
> properties.setProperty("group.id", "my_groupid")
> //properties.setProperty("enable.auto.commit", "false")
> val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
> new JSONKeyValueDeserializationSchema(true),
>
> properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
> val stream = env.addSource(consumer)
>
> stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode),
> (Int, ujson.Value)]] {
> override def map(node:ObjectNode): scala.Either[(Exception,
> ObjectNode), (Int, ujson.Value)] = {
> logger.info("##################
> %s".format(node.get("metadata").toString))
> Thread.sleep(3000)
> return Right(200, writeJs(node.toString))
> }
> }).print()
> env.execute("pp_convoy_flink")
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)