dawidwys commented on a change in pull request #13357:
URL: https://github.com/apache/flink/pull/13357#discussion_r492522939



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CircularQueues.java
##########
@@ -72,16 +76,23 @@ public void sendResult(MutableObjectIterator<E> result) {
        }
 
        @Override
-       public CircularElement<E> take(StageRunner.SortStage stage) {
-               try {
-                       return getQueue(stage).take();
-               } catch (InterruptedException e) {
-                       throw new WrappingRuntimeException(e);
+       public CircularElement<E> take(StageRunner.SortStage stage) throws 
InterruptedException {
+               while (!isFinished) {
+                       CircularElement<E> value = getQueue(stage).poll(1, 
TimeUnit.SECONDS);
+                       if (value != null) {
+                               return value;
+                       }
                }
+               throw new FlinkRuntimeException("The sorter is closed already");
        }
 
        @Override
        public CircularElement<E> poll(StageRunner.SortStage stage) {
                return getQueue(stage).poll();
        }
+
+       @Override
+       public void close() throws Exception {

Review comment:
       I am so sorry about that. I completely forgot about calling this method 
in the `ExternalSorter#close`. I added it there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to