Ivan Burmistrov created FLINK-34063:
---------------------------------------
Summary: When snapshot compression is enabled, rescaling of a
source operator leads to some splits getting lost
Key: FLINK-34063
URL: https://issues.apache.org/jira/browse/FLINK-34063
Project: Flink
Issue Type: Bug
Environment: Can be reproduced in any environment. The most important
thing is to enable snapshot compression.
Reporter: Ivan Burmistrov
Attachments: image-2024-01-11-16-27-09-066.png,
image-2024-01-11-16-30-47-466.png
h2. Backstory
We've been experimenting with Autoscaling on the Flink 1.18 and faced a pretty
nasty bug.
The symptoms on our production system were as following. After a while after
deploying a job with autoscaler it started accumulating Kafka lag, and this
could only be observed via external lag measurement - from inside Flink
(measured by
{{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
!image-2024-01-11-16-27-09-066.png|width=887,height=263!
After some digging, it turned out that the job has lost some Kafka partitions -
i.e. it stopped consuming from them, “forgot” about their existence. That’s why
from the Flink’s perspective everything was fine - the lag was growing on the
partitions Flink no longer knew about.
This was visible on a metric called “Assigned partitions”
(KafkaSourceReader_KafkaConsumer_assigned_partitions):
!image-2024-01-11-16-30-47-466.png|width=1046,height=254!
We see on the chart that the job used to know about 20 partitions, and then
this number got dropped to 16.
This drop has been quickly connected to the job’s scaling events. Or, more
precisely, to the scaling of the source operator - with almost 100% probability
any scaling of the source operator led to partitions loss.
h2. Investigation
We've conducted the investigation. We use the latest Kubernetes operator and
deploy jobs with Native Kubernetes.
The reproducing scenario we used for investigation:
* Launch a job with source operator parallelism = 4, enable DEBUG logging
* Wait until it takes the first checkpoint
* Scale-up the source operator to say 5 (no need to wait for autoscaling, it
can be done via Flink UI)
* Wait until the new checkpoint is taken
* Scale-down the source operator to 3
These simple actions with almost 100% probability led to some partitions get
lost.
After that we've downloaded all the logs and inspected them. Noticed these
strange records in logs:
{code:java}
{"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
state for 4 split(s) to reader.","service_name":"data-beaver"}
{"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
split(s) to reader:
[
[Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414,
StoppingOffset: -9223372036854775808],
[Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538,
StoppingOffset: -9223372036854775808],
[Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414,
StoppingOffset: -9223372036854775808],
[Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538,
StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
We see that some task being restored with 4 splits, however actual splits have
duplicates - we see that in reality 2 unique partitions have been added
({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).
Digging into the code and the logs a bit more, log lines like this started
looking suspicious:
{code:java}
{"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG",
"message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state
SubtaskState{operatorStateFromBackend=StateObjectCollection{
[OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
244], distributionMode=SPLIT_DISTRIBUTE}},
delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3',
dataBytes=328}},
OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
244], distributionMode=SPLIT_DISTRIBUTE}},
delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/102aa50b-78c2-457e-9a2f-0055f1dbeb98',
dataBytes=328}}]}, operatorStateFromStream=StateObjectCollection{[]},
keyedStateFromBackend=StateObjectCollection{[]},
keyedStateFromStream=StateObjectCollection{[]},
inputChannelState=StateObjectCollection{[]},
resultSubpartitionState=StateObjectCollection{[]}, stateSize=656,
checkpointedSize=656} from job manager and local state alternatives [] from
local state store
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@1f89f054.",
"service_name":"data-beaver"}{code}
We see these strange offsets *offsets=[244, 244]* that look weird.
And this is a clearly wrong. Because when restoring from snapshot, [this
code|https://github.com/apache/flink/blob/881062f352f8bf8c21ab7cbea95e111fd82fdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L350]
will redistribute offsets to different batches - and they will read the same
value.
These offsets are produced by
[this|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java#L110]
code:
{code:java}
public long[] write(FSDataOutputStream out) throws IOException {
long[] partitionOffsets = new long[internalList.size()];
DataOutputView dov = new DataOutputViewStreamWrapper(out);
for (int i = 0; i < internalList.size(); ++i) {
S element = internalList.get(i);
partitionOffsets[i] = out.getPos();
getStateMetaInfo().getPartitionStateSerializer().serialize(element,
dov);
}
return partitionOffsets;
} {code}
The actual implementation that’s being used in this piece of code is
[CompressibleFSDataOutputStream|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java#L30].
At this moment we realised that we have snapshot compression enabled
(execution.checkpointing.snapshot-compression = true).
If we take a look into how getPos() is implemented in
CompressibleFSDataOutputStream, we'd see that getPos() is delegated to the
actual output stream, while writing is happening through compressing delegate:
{code:java}
public CompressibleFSDataOutputStream(
CheckpointStateOutputStream delegate, StreamCompressionDecorator
compressionDecorator)
throws IOException {
this.delegate = delegate;
this.compressingDelegate =
compressionDecorator.decorateWithCompression(delegate);
}
@Override
public long getPos() throws IOException {
return delegate.getPos();
}
@Override
public void write(int b) throws IOException {
compressingDelegate.write(b);
} {code}
This is incorrect when compression is enabled, because compressing delegate
doesn't flush data into the actual output stream immediately
([link|https://github.com/xerial/snappy-java/blob/ebfbdead182937463735729bd8fe5f4cd69235e4/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java#L279]):
{code:java}
@Override
public void write(int b)
throws IOException
{
if (closed) {
throw new IOException("Stream is closed");
}
if (buffer.remaining() <= 0) {
flushBuffer();
}
buffer.put((byte) b);
} {code}
Hence, the position in the _delegate_ doesn't get updated, and all offsets end
up being the same.
h2. Simplest reproducing scenario
Now as we know the culprit, a simple reproducing scenario (verified) is the
following, that can be checked locally eassily:
* Create a Kafka topic with say 20 partitions
* Launch a job reading from this topic with some parallelism, say 5.
*Important: snapshot compression should be enabled in this job*
* Stop the job with savepoint
* Restore the job from this savepoint and pick a different parallelism, say 3.
* Result: some Kafka partitions will not be consumed anymore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)