Denis created FLINK-30476:
-----------------------------
Summary: TrackingFsDataInputStream batch tracking issue
Key: FLINK-30476
URL: https://issues.apache.org/jira/browse/FLINK-30476
Project: Flink
Issue Type: Bug
Components: Connectors / FileSystem
Affects Versions: 1.15.3, 1.15.2, 1.15.1
Reporter: Denis
{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream}}
wraps underlying InputStream to count bytes consumed.
{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.Reader}} relies
on this to create batches of data.
{code:java}
while (stream.hasRemainingInBatch() && (next = reader.read()) !=
null) {
result.add(next);
}
{code}
{{org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream#read(byte[],
int, int)}} contains a bug that can lead to arbitrary size batches due to
counter ({{{}remainingInBatch{}}}) underflow.
{code:java}
public int read(byte[] b, int off, int len) throws IOException {
remainingInBatch -= len;
return stream.read(b, off, len);
}
{code}
Every time we perform a {{stream.read()}} it may return less than {{len}}
according to the javadoc.
{code:java}
Params:
b – the buffer into which the data is read. off – the start offset in array b
at which the data is written. len – the maximum number of bytes to read.
Returns:
the total number of bytes read into the buffer, or -1 if there is no more data
because the end of the stream has been reached.
{code}
But current implementation accounts only bytes that were requested
({{{}{{len}}{}}}).
E.g. S3 Hadoop FS can return less than {{len}} as a result of
{{{}stream.read(b, off, len){}}}. This is expected and readers are aware of
this
{{org.apache.parquet.io.DelegatingSeekableInputStream#readFully(java.io.InputStream,
byte[], int, int)}}
As a result reading parquet file may result in underflow
{{TrackingFsDataInputStream#read(byte[], int, int)}} because parquet reader
tries to read the whole Row Group (large) and may execute {{read()}} multiple
times. Underflow leads to unlimited batch size that may lead to OOM.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)