[ 
https://issues.apache.org/jira/browse/FLINK-39162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-39162:
----------------------------
    Description: 
When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, I 
encountered a high frequency of test failures for the test case : 
Topology.CUSTOM_PARTITIONER [1]

 
{code:java}
Caused by: java.io.IOException: Can't get next record for channel 
InputChannelInfo{gateIdx=0, inputChannelIdx=1}    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:162)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:651)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:993)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:930) 
   at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:987)
    at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:969)    at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:774)    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)    at 
java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.io.IOException: 
Serializer consumed more bytes than the record had. This indicates broken 
serialization. If you are using custom serialization types (Value or Writable), 
check their serialization methods. If you are using a Kryo-serialized type, 
check the corresponding Kryo serializer.    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
    at 
org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
    ... 10 moreCaused by: java.lang.IndexOutOfBoundsException    at 
org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
    at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
    at 
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
 {code}
 

 

It could be reproduced reliably when upscale custom partitioner.
{code:java}
# existing CUSTOM_PARTITIONER test case
new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
# failure test case
new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
Do not know why the test case only covered downscaling and not upscaling.

[1] 
[https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]

  was:
When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, I 
encountered a high frequency of test failures for the test case : 
Topology.CUSTOM_PARTITIONER [1]

It could be reproduced reliably when upscale custom partitioner.
{code:java}
# existing CUSTOM_PARTITIONER test case
new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
# failure test case
new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
Do not know why the test case only covered downscaling and not upscaling.

[1] 
[https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]


> The data of the custom partitioner is corrupted when unaligned checkpoint is 
> enabled
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-39162
>                 URL: https://issues.apache.org/jira/browse/FLINK-39162
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Rui Fan
>            Priority: Major
>
> When I am working on FLINK-39140 to improve the unaligned checkpoint ITCases, 
> I encountered a high frequency of test failures for the test case : 
> Topology.CUSTOM_PARTITIONER [1]
>  
> {code:java}
> Caused by: java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=1}    at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:162)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:651)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:993)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:930)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:987)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:969)    
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:774)    at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)    at 
> java.base/java.lang.Thread.run(Thread.java:829)Caused by: 
> java.io.IOException: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.    at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:130)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:159)
>     ... 10 moreCaused by: java.lang.IndexOutOfBoundsException    at 
> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:389)    at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readByte(NonSpanningWrapper.java:112)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:201)
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>  {code}
>  
>  
> It could be reproduced reliably when upscale custom partitioner.
> {code:java}
> # existing CUSTOM_PARTITIONER test case
> new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
> # failure test case
> new Object[] {"upscale", Topology.CUSTOM_PARTITIONER, 2, 3, 0L}, {code}
> Do not know why the test case only covered downscaling and not upscaling.
> [1] 
> [https://github.com/apache/flink/blob/9964ab4bd1b8334dec9388e1e4dac68c94488691/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java#L552]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to