[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-17327:
--------------------------------------

I'm re-opening for now since I think the KafkaConsumer is working as designed, 
i.e. FLINK-16482 is not a bug (though I don't like the exception throwing 
behaviour).

Btw, the Kafka Producer is stuck on a lock, that's why the TM is eventually 
killed:
{code}
2020-05-04 16:43:21,297 WARN  org.apache.flink.runtime.taskmanager.Task         
            - Task 'Map -> Sink: Unnamed (1/1)' did not react to cancelling 
signal for 30 seconds, but is stuck in method:
 sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:698)
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.commitTransaction(FlinkKafkaInternalProducer.java:103)
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:920)
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:98)
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:405)
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:358)
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1042)
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$122/1846623322.run(Unknown
 Source)
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
java.lang.Thread.run(Thread.java:748)
{code}

> Kafka unavailability could cause Flink TM shutdown
> --------------------------------------------------
>
>                 Key: FLINK-17327
>                 URL: https://issues.apache.org/jira/browse/FLINK-17327
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Jun Qin
>            Priority: Major
>         Attachments: Standalonesession.log, TM.log, TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 60000ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to