[ 
https://issues.apache.org/jira/browse/FLINK-39162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-39162:
--------------------------------------
    Affects Version/s: 1.12.7

> The data of the custom partitioner is corrupted when unaligned checkpoint is 
> enabled
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-39162
>                 URL: https://issues.apache.org/jira/browse/FLINK-39162
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.7
>            Reporter: Rui Fan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.3.0
>
>
> When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, 
> I encountered a high frequency of test failures for the test case : 
> Topology.CUSTOM_PARTITIONER [1]
>  
> {code:java}
> Caused by: java.io.IOException: Serializer consumed more bytes than the 
> record had. This indicates broken serialization. If you are using custom 
> serialization types (Value or Writable), check their serialization methods. 
> If you are using a Kryo-serialized type, check the corresponding Kryo 
> serializer.
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
>     ... 10 more
> Caused by: java.lang.IndexOutOfBoundsException
>     at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
> {code}
>  
> It could be reproduced reliably when upscale custom partitioner.
> Based on current testing, it is highly likely to be a bug in the prod code, 
> and the failure rate is over 50%.
> {code:java}
> # existing CUSTOM_PARTITIONER test case
> new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
> # failure test case
> new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
> Do not know why the test case only covered downscaling and not upscaling.
>  
> h2. More information:
> As I understand, if some partitioners do not support UC, it could fallback to 
> AC, just like the forward partitioner[2], so checkpoint is slow for these 
> partitioners.
> Currently, the data is corrupted for custom partitioner:
> - slow checkpoint is expected for disabled partitioner
> - Data corrupt is not expected
> That is why I think it should be treat as a bug.
>  
> [1] 
> [https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]
> [2] 
> https://github.com/apache/flink/blob/4fb13082da9e15eaa20392db0f1ad21e83349cfa/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java#L89



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to