[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156763#comment-16156763
 ] 

Steve Loughran commented on FLINK-7589:
---------------------------------------

well, you are allowed to file bug reports. 

However, it's not the s3 client getting GC'd, because the s3 client is retained 
for the lifespan of the FileSystem instance, so unless you are disposing of 
that, its retained.

I'd blame network connectivity: the connection was closed, you got back less 
data than you asked for. 

The s3a client does a single retry here, but it could be more sophisticated 
(HADOOP-14531). 

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7589
>                 URL: https://issues.apache.org/jira/browse/FLINK-7589
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.3.2
>            Reporter: Bowen Li
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>       at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>       at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>       at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>       at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>       at java.io.DataInputStream.read(DataInputStream.java:149)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>       at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>       at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>       at java.io.DataInputStream.readFully(DataInputStream.java:195)
>       at java.io.DataInputStream.readLong(DataInputStream.java:416)
>       at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>       at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to