[
https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Morávek resolved FLINK-34063.
-----------------------------------
Resolution: Fixed
> When snapshot compression is enabled, rescaling of a source operator leads to
> some splits getting lost
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34063
> URL: https://issues.apache.org/jira/browse/FLINK-34063
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.18.0, 1.18.1
> Environment: Can be reproduced in any environment. The most important
> thing is to enable snapshot compression.
> Reporter: Ivan Burmistrov
> Assignee: David Morávek
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.18.2
>
> Attachments: image-2024-01-11-16-27-09-066.png,
> image-2024-01-11-16-30-47-466.png
>
>
> h2. Backstory
> We've been experimenting with Autoscaling on the Flink 1.18 and faced a
> pretty nasty bug.
> The symptoms on our production system were as following. After a while after
> deploying a job with autoscaler it started accumulating Kafka lag, and this
> could only be observed via external lag measurement - from inside Flink
> (measured by
> {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
> !image-2024-01-11-16-27-09-066.png|width=887,height=263!
> After some digging, it turned out that the job has lost some Kafka partitions
> - i.e. it stopped consuming from them, “forgot” about their existence. That’s
> why from the Flink’s perspective everything was fine - the lag was growing on
> the partitions Flink no longer knew about.
> This was visible on a metric called “Assigned partitions”
> (KafkaSourceReader_KafkaConsumer_assigned_partitions):
> !image-2024-01-11-16-30-47-466.png|width=1046,height=254!
> We see on the chart that the job used to know about 20 partitions, and then
> this number got dropped to 16.
> This drop has been quickly connected to the job’s scaling events. Or, more
> precisely, to the scaling of the source operator - with almost 100%
> probability any scaling of the source operator led to partitions loss.
> h2. Investigation
> We've conducted the investigation. We use the latest Kubernetes operator and
> deploy jobs with Native Kubernetes.
> The reproducing scenario we used for investigation:
> * Launch a job with source operator parallelism = 4, enable DEBUG logging
> * Wait until it takes the first checkpoint
> * Scale-up the source operator to say 5 (no need to wait for autoscaling, it
> can be done via Flink UI)
> * Wait until the new checkpoint is taken
> * Scale-down the source operator to 3
> These simple actions with almost 100% probability led to some partitions get
> lost.
> After that we've downloaded all the logs and inspected them. Noticed these
> strange records in logs:
> {code:java}
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
> state for 4 split(s) to reader.","service_name":"data-beaver"}
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
> split(s) to reader:
> [
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414,
> StoppingOffset: -9223372036854775808],
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538,
> StoppingOffset: -9223372036854775808],
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414,
> StoppingOffset: -9223372036854775808],
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538,
> StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
> We see that some task being restored with 4 splits, however actual splits
> have duplicates - we see that in reality 2 unique partitions have been added
> ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).
> Digging into the code and the logs a bit more, log lines like this started
> looking suspicious:
>
> {code:java}
> {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG",
> "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state
> SubtaskState{operatorStateFromBackend=StateObjectCollection{
> [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
> 244], distributionMode=SPLIT_DISTRIBUTE}},
> delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3',
> dataBytes=328}},
> OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
> 244], distributionMode=SPLIT_DISTRIBUTE}},
> delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/102aa50b-78c2-457e-9a2f-0055f1dbeb98',
> dataBytes=328}}]}, operatorStateFromStream=StateObjectCollection{[]},
> keyedStateFromBackend=StateObjectCollection{[]},
> keyedStateFromStream=StateObjectCollection{[]},
> inputChannelState=StateObjectCollection{[]},
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=656,
> checkpointedSize=656} from job manager and local state alternatives [] from
> local state store
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@1f89f054.",
> "service_name":"data-beaver"}{code}
>
> We see these strange offsets *offsets=[244, 244]* that look weird.
> And this is a clearly wrong. Because when restoring from snapshot, [this
> code|https://github.com/apache/flink/blob/881062f352f8bf8c21ab7cbea95e111fd82fdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L350]
> will redistribute offsets to different batches - and they will read the same
> value.
> These offsets are produced by
> [this|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java#L110]
> code:
> {code:java}
> public long[] write(FSDataOutputStream out) throws IOException {
> long[] partitionOffsets = new long[internalList.size()];
> DataOutputView dov = new DataOutputViewStreamWrapper(out);
> for (int i = 0; i < internalList.size(); ++i) {
> S element = internalList.get(i);
> partitionOffsets[i] = out.getPos();
> getStateMetaInfo().getPartitionStateSerializer().serialize(element,
> dov);
> }
> return partitionOffsets;
> } {code}
> The actual implementation that’s being used in this piece of code is
> [CompressibleFSDataOutputStream|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java#L30].
> At this moment we realised that we have snapshot compression enabled
> (execution.checkpointing.snapshot-compression = true).
> If we take a look into how getPos() is implemented in
> CompressibleFSDataOutputStream, we'd see that getPos() is delegated to the
> actual output stream, while writing is happening through compressing delegate:
>
> {code:java}
> public CompressibleFSDataOutputStream(
> CheckpointStateOutputStream delegate, StreamCompressionDecorator
> compressionDecorator)
> throws IOException {
> this.delegate = delegate;
> this.compressingDelegate =
> compressionDecorator.decorateWithCompression(delegate);
> }
> @Override
> public long getPos() throws IOException {
> return delegate.getPos();
> }
> @Override
> public void write(int b) throws IOException {
> compressingDelegate.write(b);
> } {code}
> This is incorrect when compression is enabled, because compressing delegate
> doesn't flush data into the actual output stream immediately
> ([link|https://github.com/xerial/snappy-java/blob/ebfbdead182937463735729bd8fe5f4cd69235e4/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java#L279]):
> {code:java}
> @Override
> public void write(int b)
> throws IOException
> {
> if (closed) {
> throw new IOException("Stream is closed");
> }
> if (buffer.remaining() <= 0) {
> flushBuffer();
> }
> buffer.put((byte) b);
> } {code}
> Hence, the position in the _delegate_ doesn't get updated, and all offsets
> end up being the same.
> h2. Simplest reproducing scenario
> Now as we know the culprit, a simple reproducing scenario (verified) is the
> following, that can be checked locally eassily:
> * Create a Kafka topic with say 20 partitions
> * Launch a job reading from this topic with some parallelism, say 5.
> *Important: snapshot compression should be enabled in this job*
> * Stop the job with savepoint
> * Restore the job from this savepoint and pick a different parallelism, say
> 3.
> * Result: some Kafka partitions will not be consumed anymore.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)