[ 
https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084591#comment-17084591
 ] 

Vasii Cosmin Radu commented on FLINK-17170:
-------------------------------------------

Thanks guys for looking into this, [~klion26] [~yunta] I've attached a full 
thread dump from one of the task managers, file called "threaddump_tm1".

 

I've seen that shard consumers are waiting for the checkpoint lock, but the 
checkpoint lock it's already taken by the thread which performs the stop.

 

Some relevant parts of the thread dump

 

 
{code:java}
shardConsumers-Source: source -> Sink: sink (12/12)-thread-0priority:5 - 
threadId:0x00007f7c7c298000 - nativeId:0x774c - nativeId (decimal):30540 - 
state:BLOCKED
stackTrace:
java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:774)
- waiting to lock <0x00000007abe026c8> (a java.lang.Object)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:92)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:273)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:288)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:285)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:760)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
 

 

but the lock 0x00000007abe026c8 it's already taken here
{code:java}
Source: source -> Sink: sink (12/12)priority:5 - threadId:0x00007f7c2c0cf000 - 
nativeId:0x76fe - nativeId (decimal):30462 - state:TIMED_WAITING
stackTrace:
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000007ac041158> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:637)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:365)
at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:147)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$838/1354138988.run(Unknown
 Source)
at 
org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:125)
at 
org.apache.flink.util.function.FunctionUtils$$Lambda$839/1349808364.call(Unknown
 Source)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
- locked <0x00000007abe026c8> (a java.lang.Object)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}
 

So, FlinkKinesisConsumer#close() method is called, which delegates to 
FlinkKinesisConsumer#cancel(), then KinesisDataFetcher#shutdownFetcher() is 
called and as it seems it executes successfully, cause I don't  have any 
errrors in the logs. If there would have been an exception, the message from 
FlinkKinesisConsumer line 367 "Error while closing Kinesis data fetcher" would 
have been in my logs. Then fetcher.awaitTermination() is called, and in 
KinesisDataFetcher#awaitTermination it waits, for 1 minute. But there isn't any 
Thread._setDefaultUncaughtExceptionHandler_ set, so if there's an exception in 
the shardConsumersExecutor, I can't really see it. Side question: can I set the 
UncaughtExceptionHandler somehow?

 

For me it is quite obvious that the shardConsumersExecutor is not shutdown and 
based on my JDK knowledge, calling shutdownNow on an executor service does not 
guarantee that it will actually terminate. And since multiple threads are 
competing for the checkpoint lock, I guess the threads from the 
shardConsumersExecutor must be interrupted. How come it works when doing a 
cancel? Is there a forced interrupt being done?

 

> Cannot stop streaming job with savepoint which uses kinesis consumer
> --------------------------------------------------------------------
>
>                 Key: FLINK-17170
>                 URL: https://issues.apache.org/jira/browse/FLINK-17170
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Connectors / Kinesis
>    Affects Versions: 1.10.0
>            Reporter: Vasii Cosmin Radu
>            Priority: Major
>         Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1
>
>
> I am encountering a very strange situation where I can't stop with savepoint 
> a streaming job.
> The job reads from kinesis and sinks to S3, very simple job, no mapping 
> function, no watermarks, just source->sink. 
> Source is using flink-kinesis-consumer, sink is using StreamingFileSink. 
> Everything works fine, except stopping the job with savepoints.
> The behaviour happens only when multiple task managers are involved, having 
> sub-tasks off the job spread across multiple task manager instances. When a 
> single task manager has all the sub-tasks this issue never occurred.
> Using latest Flink 1.10.0 version, deployment done in HA mode (2 job 
> managers), in EC2, savepoints and checkpoints written on S3.
> When trying to stop, the savepoint is created correctly and appears on S3, 
> but not all sub-tasks are stopped. Some of them finished, but some just 
> remain hanged. Sometimes, on the same task manager part of the sub-tasks are 
> finished, part aren't.
> The logs don't show any errors. For the ones that succeed, the standard 
> messages appear, with "Source: <....> switched from RUNNING to FINISHED".
> For the sub-tasks hanged the last message is 
> "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - 
> Shutting down the shard consumer threads of subtask 0 ..." and that's it.
>  
> I tried using the cli (flink stop <job_id>)
> Timeout Message:
> {code:java}
> root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop 
> cf43cecd9339e8f02a12333e52966a25
> root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop 
> cf43cecd9339e8f02a12333e52966a25Suspending job 
> "cf43cecd9339e8f02a12333e52966a25" with a savepoint. 
> ------------------------------------------------------------ The program 
> finished with the following exception: org.apache.flink.util.FlinkException: 
> Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) 
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) 
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) 
> at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused 
> by: java.util.concurrent.TimeoutException at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) 
> ... 9 more{code}
>  
> Using the monitoring api, I keep getting infinite message when querying based 
> on the savepoint id, that the status id is still "IN_PROGRESS".
>  
> When performing a cancel instead of stop, it works. But cancel is deprecated, 
> so I am a bit concerned that this might fail also, maybe I was just lucky.
>  
> I attached a screenshot with what the UI is showing when this happens
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to