Vishal Palla created FLINK-33769:
------------------------------------
Summary: ExternalSorter hits "java.lang.RuntimeException: Error
obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due
to an exception: java.io.EOFException: Can't collect further: memorySource
depleted" when using custom serializer
Key: FLINK-33769
URL: https://issues.apache.org/jira/browse/FLINK-33769
Project: Flink
Issue Type: Bug
Components: Runtime / Task
Affects Versions: 1.17.2
Reporter: Vishal Palla
The [NormalizedKeySorter
library|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java]
is used to sort records in-memory. It internally uses a
[SimpleCollectingOutputView|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java]
instantiated using a fixed chunk of managed memory to store the records. When
the SimpleCollectingOutputView runs out of memory segments, it [throws an
EOFException|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java#L76]
which [should be caught by the sorter in the write method
|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java#L298]and
a {{false}} indicating that the sort buffer was full (javadoc). The issue here
is that the EOFException thrown by the SimpleCollectingOutputView is first
caught by the record serializer which offers no guarantee on passing on the
exception as it was caught upwards. In the case of Kryo and Thrift for example,
the serializer wraps the caught exception in their own exception classes and
throw them upwards which the sorter doesn't catch and the job crashes.
Example stacktrace -
{{java.lang.RuntimeException: Error obtaining the sorted input: Thread
'SortMerger Reading Thread' terminated due to an exception:
java.io.EOFException: Can't collect further: memorySource depleted
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:487)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.WrappingRuntimeException:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due
to an exception: java.io.EOFException: Can't collect further: memorySource
depleted
at
org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:262)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1222)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated
due to an exception: java.io.EOFException: Can't collect further: memorySource
depleted
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at
org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:259)
... 9 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread
'SortMerger Reading Thread' terminated due to an exception:
java.io.EOFException: Can't collect further: memorySource depleted
at
org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:256)
at
java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
at
java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at
org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:397)
at
org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleException(ThreadBase.java:121)
at
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:75)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated
due to an exception: java.io.EOFException: Can't collect further: memorySource
depleted
at
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:80)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: Can't
collect further: memorySource depleted
at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
at com.esotericsoftware.kryo.io.OutputChunked.flush(OutputChunked.java:45)
at
com.esotericsoftware.kryo.io.OutputChunked.endChunks(OutputChunked.java:82)
at com.twitter.beam.coder.scala.ChillCoder.encode(ChillCoder.scala:101)
at
com.twitter.eventwrangler.core.attribution.AttributionEventCoder.encode(AttributionEvent.scala:40)
at
com.twitter.eventwrangler.core.attribution.AttributionEventCoder.encode(AttributionEvent.scala:22)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:74)
at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:607)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:598)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:558)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:110)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)
at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:297)
at
org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:77)
at
org.apache.flink.runtime.operators.sort.ReadingThread.go(ReadingThread.java:69)
at
org.apache.flink.runtime.operators.sort.ThreadBase.run(ThreadBase.java:73)
Caused by: java.io.EOFException: Can't collect further: memorySource depleted
at
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:76)
at
org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:139)
at
org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:205)
at
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:44)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
... 20 more}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)