[
https://issues.apache.org/jira/browse/FLINK-17192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139375#comment-17139375
]
Stephan Ewen commented on FLINK-17192:
--------------------------------------
Are you using Avro Specific Records? These I would expect to work, because
their type extraction is integrated fairly deeply, and they do not need
external parameters. Generic records cannot work, though, they depend on
explicitly specifying the Avro Schema.
*Fixing this in DataSet:*
The critical part here is to pass the key serializer through the stack.
The DataSet API initially was designed under the assumption to not have
explicit keys, but rather parameterized record comparators. So there is no
explicit key serializer at the point when the sorter is created. (We have moved
away from that design for the DataStream API. It is a nice design in theory,
but hits issues eventually, like this one here).
We would need to extend {{CompositeType}} to create a serializer for the key
parts similar to the way it creates a comparator.
Fairly complex logic, though, because it implements a simple logical schema
over physical types.
*In the DataStream API*
We are working to bit by bit have the DataStream API subsume the functionality
of the DataSet API.
There we deal with explicit keys and this problem should not exist.
> java.lang.RuntimeException: Error obtaining the sorted input: Thread
> 'SortMerger Reading Thread' terminated due to an exception: The record
> exceeds the maximum size of a sort buffer (current maximum: 72351744 bytes)
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-17192
> URL: https://issues.apache.org/jira/browse/FLINK-17192
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.8.2, 1.9.0, 1.10.0, 1.11.0
> Environment: flink 1.8.2
> Reporter: Nikola
> Priority: Critical
>
> We are trying to sort our data and we get to the following exception:
> {code:java}
> java.lang.RuntimeException: Error obtaining the sorted input: Thread
> 'SortMerger Reading Thread' terminated due to an exception: The record
> exceeds the maximum size of a sort buffer (current maximum: 72351744 bytes).
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
>
> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:82)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated
> due to an exception: The record exceeds the maximum size of a sort buffer
> (current maximum: 72351744 bytes).
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> Caused by: java.io.IOException: The record exceeds the maximum size of a sort
> buffer (current maximum: 72351744 bytes).
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:986)
>
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827){code}
>
> The sort buffer says to be "72351744 bytes" which is about 70MB. Our TM has
> 30GB memory set for flink.
> I tried to find the same issue, but nothing really gave me an answer. I tried
> finding the same issue on JIRA, but I couldn't.
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Any-way-to-increase-sort-buffer-size-td10152.html]
> This one is from 2016 so I am not sure if I can trust it anymore since Flink
> has evolved since
>
> Is it possible to increase this sort buffer?
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)