macdoor615 created FLINK-39484:
----------------------------------
Summary: NativeS3InputStream: abort unfinished GetObject responses
on seek/reopen to avoid ConnectionClosedException on S3-compatible storage
Key: FLINK-39484
URL: https://issues.apache.org/jira/browse/FLINK-39484
Project: Flink
Issue Type: Bug
Components: FileSystems
Affects Versions: 2.3.0
Environment: * Apache Flink: 2.3.0 / paimon 1.3.1
* Storage: MinIO
* Workload: Paimon catalog reading large Parquet files; high parallelism (many
concurrent readers)
* JVM: Java 21
* Relevant Flink S3 settings (examples): s3.connection.timeout,
s3.socket.timeout, s3.read.buffer.size, s3.retry.max-num-retries
Reporter: macdoor615
Fix For: 2.3.0
Attachments: flink-dict-taskexecutor-0-cmtt-dict-57.log
h2. Problem
When reading large Parquet files (e.g. via Paimon) from S3-compatible endpoints
such as MinIO using the {{flink-s3-fs-native}} plugin, tasks can fail with:
{code:java}
Caused by:
org.apache.flink.fs.s3native.shaded.org.apache.http.ConnectionClosedException:
Premature end of Content-Length delimited message body (expected: 41,228,206;
received: 15,528,875)
{code}
The stack trace points to closing the buffered stream inside{{
NativeS3InputStream.close()}} while Apache HttpClient tries to drain the
remainder of the HTTP response body.
Parquet access patterns use{{ seek()}} on {{{}FSDataInputStream{}}}.
{{NativeS3InputStream}} issues ranged GETs and replaces the underlying
{{GetObject}} stream. The previous implementation closed the old stream without
consuming the full response body. For AWS SDK v2 + Apache HTTP client,
{{close()}} attempts to read the remaining bytes to reuse the connection. If
the server or network has already terminated the connection, draining fails
with the exception above.
h2. Expected behavior
Abandoning a partially read {{GetObject}} body (after seek/reopen or early
close) should use {{ResponseInputStream.abort()}} per AWS SDK guidance, instead
of relying on {{close()}} to drain the body.
h2. Suggested fix
* On reopen {{(openStreamAtCurrentPosition) }}and on {{close()}} when
{{{}position < contentLength{}}}, call {{abort()}} on the
{{ResponseInputStream }}before discarding wrappers.
* Optionally document that {{abort()}} may reduce connection reuse compared to
fully draining after a complete sequential read.
h2. Notes
This is independent of tuning {{s3.socket.timeout }}/
{{{}s3.connection.timeout{}}}; the failure occurs during response teardown, not
only slow reads.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)