[ 
https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568270#comment-16568270
 ] 

ASF GitHub Bot commented on FLINK-9969:
---------------------------------------

asfgit closed pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters 
created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index f3bea87a039..c450880f98b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -176,6 +176,15 @@ public MemoryManager(long memorySize, int numberOfSlots, 
int pageSize,
                        default:
                                throw new 
IllegalArgumentException("unrecognized memory type: " + memoryType);
                }
+
+               LOG.debug("Initialized MemoryManager with total memory size {}, 
number of slots {}, page size {}, " +
+                               "memory type {}, pre allocate memory {} and 
number of non allocated pages {}.",
+                       memorySize,
+                       numberOfSlots,
+                       pageSize,
+                       memoryType,
+                       preAllocateMemory,
+                       numNonAllocatedPages);
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
new file mode 100644
index 00000000000..673ac41e00b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/**
+ * Default factory for {@link InMemorySorter}.
+ */
+public class DefaultInMemorySorterFactory<T> implements 
InMemorySorterFactory<T> {
+
+       @Nonnull
+       private final TypeSerializerFactory<T> typeSerializerFactory;
+
+       @Nonnull
+       private final TypeComparator<T> typeComparator;
+
+       private final boolean useFixedLengthRecordSorter;
+
+       DefaultInMemorySorterFactory(
+                       @Nonnull TypeSerializerFactory<T> typeSerializerFactory,
+                       @Nonnull TypeComparator<T> typeComparator,
+                       int thresholdForInPlaceSorting) {
+               this.typeSerializerFactory = typeSerializerFactory;
+               this.typeComparator = typeComparator;
+
+               TypeSerializer<T> typeSerializer = 
typeSerializerFactory.getSerializer();
+
+               this.useFixedLengthRecordSorter = 
typeComparator.supportsSerializationWithKeyNormalization() &&
+                       typeSerializer.getLength() > 0 && 
typeSerializer.getLength() <= thresholdForInPlaceSorting;
+       }
+
+       @Override
+       public InMemorySorter<T> create(List<MemorySegment> sortSegments) {
+               final TypeSerializer<T> typeSerializer = 
typeSerializerFactory.getSerializer();
+               final TypeComparator<T> duplicateTypeComparator = 
typeComparator.duplicate();
+
+               if (useFixedLengthRecordSorter) {
+                       return new FixedLengthRecordSorter<>(typeSerializer, 
duplicateTypeComparator, sortSegments);
+               } else {
+                       return new NormalizedKeySorter<>(typeSerializer, 
duplicateTypeComparator, sortSegments);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
new file mode 100644
index 00000000000..e15f5d8aea5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.util.List;
+
+/**
+ * Factory for {@link InMemorySorter}.
+ */
+public interface InMemorySorterFactory<T> {
+
+       /**
+        * Create an {@link InMemorySorter} instance with the given memory 
segments.
+        *
+        * @param sortSegments to initialize the InMemorySorter with
+        * @return new InMemorySorter instance
+        */
+       InMemorySorter<T> create(List<MemorySegment> sortSegments);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 934eeb7c2b4..1c9a8d77a30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -151,6 +152,8 @@
         */
        protected final boolean objectReuseEnabled;
 
+       private final Collection<InMemorySorter<?>> inMemorySorters;
+
        // 
------------------------------------------------------------------------
        //                         Constructor & Shutdown
        // 
------------------------------------------------------------------------
@@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager 
memoryManager, List<MemorySegment>
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        int numSortBuffers, int maxNumFileHandles,
                        float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords,
-                       boolean objectReuseEnabled)
-       throws IOException
-       {
+                       boolean objectReuseEnabled) throws IOException {
+               this (
+                       memoryManager,
+                       memory,
+                       ioManager,
+                       input,
+                       parentTask,
+                       serializerFactory,
+                       comparator,
+                       numSortBuffers,
+                       maxNumFileHandles,
+                       startSpillingFraction,
+                       noSpillingMemory,
+                       handleLargeRecords,
+                       objectReuseEnabled,
+                       new DefaultInMemorySorterFactory<>(serializerFactory, 
comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
+       }
+
+       protected UnilateralSortMerger(
+                       MemoryManager memoryManager,
+                       List<MemorySegment> memory,
+                       IOManager ioManager,
+                       MutableObjectIterator<E> input,
+                       AbstractInvokable parentTask,
+                       TypeSerializerFactory<E> serializerFactory,
+                       TypeComparator<E> comparator,
+                       int numSortBuffers,
+                       int maxNumFileHandles,
+                       float startSpillingFraction,
+                       boolean noSpillingMemory,
+                       boolean handleLargeRecords,
+                       boolean objectReuseEnabled,
+                       InMemorySorterFactory<E> inMemorySorterFactory) throws 
IOException {
                // sanity checks
                if (memoryManager == null || (ioManager == null && 
!noSpillingMemory) || serializerFactory == null || comparator == null) {
                        throw new NullPointerException();
@@ -330,6 +363,8 @@ protected UnilateralSortMerger(MemoryManager memoryManager, 
List<MemorySegment>
                
                // circular queues pass buffers between the threads
                final CircularQueues<E> circularQueues = new 
CircularQueues<E>();
+
+               inMemorySorters = new ArrayList<>(numSortBuffers);
                
                // allocate the sort buffers and fill empty queue with them
                final Iterator<MemorySegment> segments = 
this.sortReadMemory.iterator();
@@ -341,20 +376,11 @@ protected UnilateralSortMerger(MemoryManager 
memoryManager, List<MemorySegment>
                                sortSegments.add(segments.next());
                        }
                        
-                       final TypeComparator<E> comp = comparator.duplicate();
-                       final InMemorySorter<E> buffer;
-                       
-                       // instantiate a fix-length in-place sorter, if 
possible, otherwise the out-of-place sorter
-                       if (comp.supportsSerializationWithKeyNormalization() &&
-                                       serializer.getLength() > 0 && 
serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
-                       {
-                               buffer = new 
FixedLengthRecordSorter<E>(serializerFactory.getSerializer(), comp, 
sortSegments);
-                       } else {
-                               buffer = new 
NormalizedKeySorter<E>(serializerFactory.getSerializer(), comp, sortSegments);
-                       }
+                       final InMemorySorter<E> inMemorySorter = 
inMemorySorterFactory.create(sortSegments);
+                       inMemorySorters.add(inMemorySorter);
 
                        // add to empty queue
-                       CircularElement<E> element = new CircularElement<E>(i, 
buffer, sortSegments);
+                       CircularElement<E> element = new CircularElement<E>(i, 
inMemorySorter, sortSegments);
                        circularQueues.empty.add(element);
                }
 
@@ -494,7 +520,12 @@ public void close() {
                        }
                }
                finally {
-                       
+
+                       // Dispose all in memory sorter in order to clear 
memory references
+                       for (InMemorySorter<?> inMemorySorter : 
inMemorySorters) {
+                               inMemorySorter.dispose();
+                       }
+
                        // RELEASE ALL MEMORY. If the threads and channels are 
still running, this should cause
                        // exceptions, because their memory segments are freed
                        try {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 60b2ed8fee7..92ae1676ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -78,6 +78,7 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -243,7 +244,9 @@
        /** atomic flag that makes sure the invokable is canceled exactly once 
upon error. */
        private final AtomicBoolean invokableHasBeenCanceled;
 
-       /** The invokable of this task, if initialized. */
+       /** The invokable of this task, if initialized. All accesses must copy 
the reference and
+        * check for null, as this field is cleared as part of the disposal 
logic. */
+       @Nullable
        private volatile AbstractInvokable invokable;
 
        /** The current execution state of the task. */
@@ -473,6 +476,12 @@ long getTaskCancellationTimeout() {
                return taskCancellationTimeout;
        }
 
+       @Nullable
+       @VisibleForTesting
+       AbstractInvokable getInvokable() {
+               return invokable;
+       }
+
        // 
------------------------------------------------------------------------
        //  Task Execution
        // 
------------------------------------------------------------------------
@@ -762,7 +771,7 @@ else if (current == ExecutionState.CANCELING) {
                                        if (current == ExecutionState.RUNNING 
|| current == ExecutionState.DEPLOYING) {
                                                if (t instanceof 
CancelTaskException) {
                                                        if 
(transitionState(current, ExecutionState.CANCELED)) {
-                                                               
cancelInvokable();
+                                                               
cancelInvokable(invokable);
 
                                                                
notifyObservers(ExecutionState.CANCELED, null);
                                                                break;
@@ -773,7 +782,7 @@ else if (current == ExecutionState.CANCELING) {
                                                                // proper 
failure of the task. record the exception as the root cause
                                                                String 
errorMessage = String.format("Execution of %s (%s) failed.", 
taskNameWithSubtask, executionId);
                                                                failureCause = 
t;
-                                                               
cancelInvokable();
+                                                               
cancelInvokable(invokable);
 
                                                                
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
                                                                break;
@@ -808,6 +817,10 @@ else if (transitionState(current, ExecutionState.FAILED, 
t)) {
                        try {
                                LOG.info("Freeing task resources for {} ({}).", 
taskNameWithSubtask, executionId);
 
+                               // clear the reference to the invokable. this 
helps guard against holding references
+                               // to the invokable and its structures in cases 
where this Task object is still referenced
+                               this.invokable = null;
+
                                // stop the async dispatcher.
                                // copy dispatcher reference to stack, against 
concurrent release
                                ExecutorService dispatcher = 
this.asyncCallDispatcher;
@@ -924,18 +937,20 @@ private boolean transitionState(ExecutionState 
currentState, ExecutionState newS
         * @throws IllegalStateException if the {@link Task} is not yet running
         */
        public void stopExecution() {
+               // copy reference to stack, to guard against concurrent setting 
to null
+               final AbstractInvokable invokable = this.invokable;
+
                if (invokable != null) {
-                       LOG.info("Attempting to stop task {} ({}).", 
taskNameWithSubtask, executionId);
                        if (invokable instanceof StoppableTask) {
-                               Runnable runnable = new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       ((StoppableTask) 
invokable).stop();
-                                               } catch (RuntimeException e) {
-                                                       LOG.error("Stopping 
task {} ({}) failed.", taskNameWithSubtask, executionId, e);
-                                                       
taskManagerActions.failTask(executionId, e);
-                                               }
+                               LOG.info("Attempting to stop task {} ({}).", 
taskNameWithSubtask, executionId);
+                               final StoppableTask stoppable = (StoppableTask) 
invokable;
+
+                               Runnable runnable = () -> {
+                                       try {
+                                               stoppable.stop();
+                                       } catch (Throwable t) {
+                                               LOG.error("Stopping task {} 
({}) failed.", taskNameWithSubtask, executionId, t);
+                                               
taskManagerActions.failTask(executionId, t);
                                        }
                                };
                                executeAsyncCallRunnable(runnable, 
String.format("Stopping source task %s (%s).", taskNameWithSubtask, 
executionId));
@@ -945,7 +960,7 @@ public void run() {
                } else {
                        throw new IllegalStateException(
                                String.format(
-                                       "Cannot stop task %s (%s) because it is 
not yet running.",
+                                       "Cannot stop task %s (%s) because it is 
not running.",
                                        taskNameWithSubtask,
                                        executionId));
                }
@@ -1010,6 +1025,10 @@ else if (current == ExecutionState.RUNNING) {
                                if (transitionState(ExecutionState.RUNNING, 
targetState, cause)) {
                                        // we are canceling / failing out of 
the running state
                                        // we need to cancel the invokable
+
+                                       // copy reference to guard against 
concurrent null-ing out the reference
+                                       final AbstractInvokable invokable = 
this.invokable;
+
                                        if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
                                                this.failureCause = cause;
                                                notifyObservers(
@@ -1363,9 +1382,9 @@ private void executeAsyncCallRunnable(Runnable runnable, 
String callName) {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private void cancelInvokable() {
+       private void cancelInvokable(AbstractInvokable invokable) {
                // in case of an exception during execution, we still call 
"cancel()" on the task
-               if (invokable != null && this.invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
+               if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
                        try {
                                invokable.cancel();
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index fa675f69e24..d32cad05cdd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -18,21 +18,11 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
 import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -47,9 +37,20 @@
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 public class CombiningUnilateralSortMergerITCase extends TestLogger {
        
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
new file mode 100644
index 00000000000..f5a56bce4e1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link UnilateralSortMerger}.
+ */
+public class UnilateralSortMergerTest extends TestLogger {
+
+       @Test
+       public void testInMemorySorterDisposal() throws Exception {
+               final TestingInMemorySorterFactory<Tuple2<Integer, Integer>> 
inMemorySorterFactory = new TestingInMemorySorterFactory<>();
+
+               final int numPages = 32;
+               final MemoryManager memoryManager = new 
MemoryManager(MemoryManager.DEFAULT_PAGE_SIZE * numPages, 1);
+               final DummyInvokable parentTask = new DummyInvokable();
+               final UnilateralSortMerger<Tuple2<Integer, Integer>> 
unilateralSortMerger = new UnilateralSortMerger<>(
+                       memoryManager,
+                       memoryManager.allocatePages(parentTask, numPages),
+                       new IOManagerAsync(),
+                       EmptyMutableObjectIterator.get(),
+                       parentTask,
+                       TestData.getIntIntTupleSerializerFactory(),
+                       TestData.getIntIntTupleComparator(),
+                       10,
+                       2,
+                       1.0f,
+                       true,
+                       false,
+                       false,
+                       inMemorySorterFactory);
+
+               final Collection<TestingInMemorySorter<?>> inMemorySorters = 
inMemorySorterFactory.getInMemorySorters();
+
+               assertThat(inMemorySorters, is(not(empty())));
+
+               unilateralSortMerger.close();
+
+               assertThat(unilateralSortMerger.closed, is(true));
+
+               for (TestingInMemorySorter<?> inMemorySorter : inMemorySorters) 
{
+                       assertThat(inMemorySorter.isDisposed(), is(true));
+               }
+       }
+
+       private static final class TestingInMemorySorterFactory<T> implements 
InMemorySorterFactory<T> {
+
+               private final Collection<TestingInMemorySorter<?>> 
inMemorySorters = new ArrayList<>(10);
+
+               Collection<TestingInMemorySorter<?>> getInMemorySorters() {
+                       return inMemorySorters;
+               }
+
+               @Override
+               public InMemorySorter<T> create(List<MemorySegment> 
sortSegments) {
+                       final TestingInMemorySorter<T> testingInMemorySorter = 
new TestingInMemorySorter<>();
+                       inMemorySorters.add(testingInMemorySorter);
+                       return testingInMemorySorter;
+               }
+       }
+
+       private static final class TestingInMemorySorter<T> implements 
InMemorySorter<T> {
+
+               private volatile boolean isDisposed;
+
+               public boolean isDisposed() {
+                       return isDisposed;
+               }
+
+               @Override
+               public void reset() {
+
+               }
+
+               @Override
+               public boolean isEmpty() {
+                       return false;
+               }
+
+               @Override
+               public void dispose() {
+                       isDisposed = true;
+               }
+
+               @Override
+               public long getCapacity() {
+                       return 0;
+               }
+
+               @Override
+               public long getOccupancy() {
+                       return 0;
+               }
+
+               @Override
+               public T getRecord(int logicalPosition) throws IOException {
+                       return null;
+               }
+
+               @Override
+               public T getRecord(T reuse, int logicalPosition) throws 
IOException {
+                       return null;
+               }
+
+               @Override
+               public boolean write(T record) throws IOException {
+                       return false;
+               }
+
+               @Override
+               public MutableObjectIterator<T> getIterator() {
+                       return null;
+               }
+
+               @Override
+               public void writeToOutput(ChannelWriterOutputView output) 
throws IOException {
+
+               }
+
+               @Override
+               public void writeToOutput(ChannelWriterOutputView output, 
LargeRecordHandler<T> largeRecordsOutput) throws IOException {
+
+               }
+
+               @Override
+               public void writeToOutput(ChannelWriterOutputView output, int 
start, int num) throws IOException {
+
+               }
+
+               @Override
+               public int compare(int i, int j) {
+                       return 0;
+               }
+
+               @Override
+               public int compare(int segmentNumberI, int segmentOffsetI, int 
segmentNumberJ, int segmentOffsetJ) {
+                       return 0;
+               }
+
+               @Override
+               public void swap(int i, int j) {
+
+               }
+
+               @Override
+               public void swap(int segmentNumberI, int segmentOffsetI, int 
segmentNumberJ, int segmentOffsetJ) {
+
+               }
+
+               @Override
+               public int size() {
+                       return 0;
+               }
+
+               @Override
+               public int recordSize() {
+                       return 0;
+               }
+
+               @Override
+               public int recordsPerSegment() {
+                       return 0;
+               }
+       }
+
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 1829e977489..3dfcfb3b059 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -174,6 +174,7 @@ public void testRegularExecution() {
                        assertEquals(ExecutionState.FINISHED, 
task.getExecutionState());
                        assertFalse(task.isCanceledOrFailed());
                        assertNull(task.getFailureCause());
+                       assertNull(task.getInvokable());
 
                        // verify listener messages
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
@@ -202,6 +203,8 @@ public void testCancelRightAway() {
                        // verify final state
                        assertEquals(ExecutionState.CANCELED, 
task.getExecutionState());
                        validateUnregisterTask(task.getExecutionId());
+
+                       assertNull(task.getInvokable());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -258,6 +261,8 @@ public void testLibraryCacheRegistrationFailed() {
 
                        // make sure that the TaskManager received an message 
to unregister the task
                        validateUnregisterTask(task.getExecutionId());
+
+                       assertNull(task.getInvokable());
                }
                catch (Exception e) {
                        e.printStackTrace();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Unreasonable memory requirements to complete examples/batch/WordCount
> ---------------------------------------------------------------------
>
>                 Key: FLINK-9969
>                 URL: https://issues.apache.org/jira/browse/FLINK-9969
>             Project: Flink
>          Issue Type: Bug
>          Components: ResourceManager
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0
>            Reporter: Piotr Nowojski
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.5.3, 1.6.0
>
>         Attachments: yarn_logs
>
>
> setup on AWS EMR:
>  * 5 worker nodes (m4.4xlarge nodes) 
>  * 1 master node (m4.large)
> following command fails with out of memory errors:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 
> examples/batch/WordCount.jar{noformat}
> Only increasing memory over 17.2GB example completes. At the same time after 
> disabling flip6 following command succeeds:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 
> examples/batch/WordCount.jar{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to