[
https://issues.apache.org/jira/browse/FLINK-9812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617593#comment-16617593
]
Nico Kruber commented on FLINK-9812:
------------------------------------
Nice find [~yanghua].
Actually, setting {{spillingChannel = null}} here will just lead to a different
error: after looking it once more, this loop is wrong use of the
{{SpillingAdaptiveSpanningRecordDeserializer}}:
{code}
for (SerializationTestType record : records) {
serializedRecords.add(record);
numRecords++;
// serialize record
if (serializer.addRecord(record).isFullBuffer()) {
// buffer is full => start deserializing
deserializer.setNextBuffer(serializationResult.buildBuffer());
while (!serializedRecords.isEmpty()) {
SerializationTestType expected =
serializedRecords.poll();
SerializationTestType actual =
expected.getClass().newInstance();
if
(deserializer.getNextRecord(actual).isFullRecord()) {
Assert.assertEquals(expected,
actual);
numRecords--;
} else {
serializedRecords.addFirst(expected);
break;
}
}
// move buffers as long as necessary (for long
records)
while ((serializationResult =
setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
deserializer.setNextBuffer(serializationResult.buildBuffer());
serializer.clear();
}
}
}
{code}
After calling {{deserializer.setNextBuffer()}}, it should be drained from all
the stored records before adding any buffer again.
- while spilling a spanning record, only {{getNextRecord()}} actually calls
{{SpanningWrapper#moveRemainderToNonSpanningDeserializer()}} and adding more
buffers may silently corrupt existing data (from {{segmentRemaining}})
- same without spilling
- without spanning, we don't get into this situation for this test
I'll prepare a fix for that
> SpanningRecordSerializationTest fails on travis
> -----------------------------------------------
>
> Key: FLINK-9812
> URL: https://issues.apache.org/jira/browse/FLINK-9812
> Project: Flink
> Issue Type: Bug
> Components: Network, Tests, Type Serialization System
> Affects Versions: 1.6.0
> Reporter: Chesnay Schepler
> Priority: Critical
> Fix For: 1.7.0, 1.6.2
>
>
> https://travis-ci.org/zentol/flink/jobs/402744191
> {code}
> testHandleMixedLargeRecords(org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest)
> Time elapsed: 6.113 sec <<< ERROR!
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
> at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.addNextChunkFromMemorySegment(SpillingAdaptiveSpanningRecordDeserializer.java:529)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.access$200(SpillingAdaptiveSpanningRecordDeserializer.java:431)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.setNextBuffer(SpillingAdaptiveSpanningRecordDeserializer.java:76)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testSerializationRoundTrip(SpanningRecordSerializationTest.java:149)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testSerializationRoundTrip(SpanningRecordSerializationTest.java:115)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testHandleMixedLargeRecords(SpanningRecordSerializationTest.java:104)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)