[ 
https://issues.apache.org/jira/browse/FLINK-4660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4660:
--------------------------------
    Description: 
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.

This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.
I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.
A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)
4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.

  was:
Flink job with checkpoints enabled and configured to use S3A file system 
backend, sometimes experiences checkpointing failure due to S3 consistency 
issue. This behavior is also reported by other people and documented in 
https://issues.apache.org/jira/browse/FLINK-4218.
This problem gets magnified by current HadoopFileSystem implementation, which 
can potentially leak S3 client connections, and eventually get into a 
restarting loop with “Timeout waiting for a connection from pool” exception 
thrown from aws client.
I looked at the code, seems HadoopFileSystem.java never invoke close() method 
on fs object upon failure, but the FileSystem may be re-initialized every time 
the job gets restarted.
A few evidence I observed:
1. When I set the connection pool limit to 128, and below commands shows 128 
connections are stuck in CLOSE_WAIT state.

2. task manager logs indicates that state backend file system consistently 
getting initialized upon job restarting.

3. Log indicates there is NPE during cleanning up of stream task which was 
caused by “Timeout waiting for connection from pool” exception when trying to 
create a directory in S3 bucket.
2016-09-02 08:17:50,886 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
stream task
java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
at java.lang.Thread.run(Thread.java:745)
4.It appears StreamTask from invoking checkpointing operation, to handling 
failure, there is no logic associated with closing Hadoop File System object 
(which internally includes S3 aws client object), which resides in 
HadoopFileSystem.java.


> HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in 
> a restarting loop
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4660
>                 URL: https://issues.apache.org/jira/browse/FLINK-4660
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>            Reporter: Zhenzhong Xu
>            Priority: Critical
>
> Flink job with checkpoints enabled and configured to use S3A file system 
> backend, sometimes experiences checkpointing failure due to S3 consistency 
> issue. This behavior is also reported by other people and documented in 
> https://issues.apache.org/jira/browse/FLINK-4218.
> This problem gets magnified by current HadoopFileSystem implementation, which 
> can potentially leak S3 client connections, and eventually get into a 
> restarting loop with “Timeout waiting for a connection from pool” exception 
> thrown from aws client.
> I looked at the code, seems HadoopFileSystem.java never invoke close() method 
> on fs object upon failure, but the FileSystem may be re-initialized every 
> time the job gets restarted.
> A few evidence I observed:
> 1. When I set the connection pool limit to 128, and below commands shows 128 
> connections are stuck in CLOSE_WAIT state.
> 2. task manager logs indicates that state backend file system consistently 
> getting initialized upon job restarting.
> 3. Log indicates there is NPE during cleanning up of stream task which was 
> caused by “Timeout waiting for connection from pool” exception when trying to 
> create a directory in S3 bucket.
> 2016-09-02 08:17:50,886 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error during cleanup of 
> stream task
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.cleanup(OneInputStreamTask.java:73)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:323)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:589)
> at java.lang.Thread.run(Thread.java:745)
> 4.It appears StreamTask from invoking checkpointing operation, to handling 
> failure, there is no logic associated with closing Hadoop File System object 
> (which internally includes S3 aws client object), which resides in 
> HadoopFileSystem.java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to