[
https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568270#comment-16568270
]
ASF GitHub Bot commented on FLINK-9969:
---------------------------------------
asfgit closed pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters
created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index f3bea87a039..c450880f98b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -176,6 +176,15 @@ public MemoryManager(long memorySize, int numberOfSlots,
int pageSize,
default:
throw new
IllegalArgumentException("unrecognized memory type: " + memoryType);
}
+
+ LOG.debug("Initialized MemoryManager with total memory size {},
number of slots {}, page size {}, " +
+ "memory type {}, pre allocate memory {} and
number of non allocated pages {}.",
+ memorySize,
+ numberOfSlots,
+ pageSize,
+ memoryType,
+ preAllocateMemory,
+ numNonAllocatedPages);
}
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
new file mode 100644
index 00000000000..673ac41e00b
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/**
+ * Default factory for {@link InMemorySorter}.
+ */
+public class DefaultInMemorySorterFactory<T> implements
InMemorySorterFactory<T> {
+
+ @Nonnull
+ private final TypeSerializerFactory<T> typeSerializerFactory;
+
+ @Nonnull
+ private final TypeComparator<T> typeComparator;
+
+ private final boolean useFixedLengthRecordSorter;
+
+ DefaultInMemorySorterFactory(
+ @Nonnull TypeSerializerFactory<T> typeSerializerFactory,
+ @Nonnull TypeComparator<T> typeComparator,
+ int thresholdForInPlaceSorting) {
+ this.typeSerializerFactory = typeSerializerFactory;
+ this.typeComparator = typeComparator;
+
+ TypeSerializer<T> typeSerializer =
typeSerializerFactory.getSerializer();
+
+ this.useFixedLengthRecordSorter =
typeComparator.supportsSerializationWithKeyNormalization() &&
+ typeSerializer.getLength() > 0 &&
typeSerializer.getLength() <= thresholdForInPlaceSorting;
+ }
+
+ @Override
+ public InMemorySorter<T> create(List<MemorySegment> sortSegments) {
+ final TypeSerializer<T> typeSerializer =
typeSerializerFactory.getSerializer();
+ final TypeComparator<T> duplicateTypeComparator =
typeComparator.duplicate();
+
+ if (useFixedLengthRecordSorter) {
+ return new FixedLengthRecordSorter<>(typeSerializer,
duplicateTypeComparator, sortSegments);
+ } else {
+ return new NormalizedKeySorter<>(typeSerializer,
duplicateTypeComparator, sortSegments);
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
new file mode 100644
index 00000000000..e15f5d8aea5
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.util.List;
+
+/**
+ * Factory for {@link InMemorySorter}.
+ */
+public interface InMemorySorterFactory<T> {
+
+ /**
+ * Create an {@link InMemorySorter} instance with the given memory
segments.
+ *
+ * @param sortSegments to initialize the InMemorySorter with
+ * @return new InMemorySorter instance
+ */
+ InMemorySorter<T> create(List<MemorySegment> sortSegments);
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 934eeb7c2b4..1c9a8d77a30 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -151,6 +152,8 @@
*/
protected final boolean objectReuseEnabled;
+ private final Collection<InMemorySorter<?>> inMemorySorters;
+
//
------------------------------------------------------------------------
// Constructor & Shutdown
//
------------------------------------------------------------------------
@@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager
memoryManager, List<MemorySegment>
TypeSerializerFactory<E> serializerFactory,
TypeComparator<E> comparator,
int numSortBuffers, int maxNumFileHandles,
float startSpillingFraction, boolean noSpillingMemory,
boolean handleLargeRecords,
- boolean objectReuseEnabled)
- throws IOException
- {
+ boolean objectReuseEnabled) throws IOException {
+ this (
+ memoryManager,
+ memory,
+ ioManager,
+ input,
+ parentTask,
+ serializerFactory,
+ comparator,
+ numSortBuffers,
+ maxNumFileHandles,
+ startSpillingFraction,
+ noSpillingMemory,
+ handleLargeRecords,
+ objectReuseEnabled,
+ new DefaultInMemorySorterFactory<>(serializerFactory,
comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
+ }
+
+ protected UnilateralSortMerger(
+ MemoryManager memoryManager,
+ List<MemorySegment> memory,
+ IOManager ioManager,
+ MutableObjectIterator<E> input,
+ AbstractInvokable parentTask,
+ TypeSerializerFactory<E> serializerFactory,
+ TypeComparator<E> comparator,
+ int numSortBuffers,
+ int maxNumFileHandles,
+ float startSpillingFraction,
+ boolean noSpillingMemory,
+ boolean handleLargeRecords,
+ boolean objectReuseEnabled,
+ InMemorySorterFactory<E> inMemorySorterFactory) throws
IOException {
// sanity checks
if (memoryManager == null || (ioManager == null &&
!noSpillingMemory) || serializerFactory == null || comparator == null) {
throw new NullPointerException();
@@ -330,6 +363,8 @@ protected UnilateralSortMerger(MemoryManager memoryManager,
List<MemorySegment>
// circular queues pass buffers between the threads
final CircularQueues<E> circularQueues = new
CircularQueues<E>();
+
+ inMemorySorters = new ArrayList<>(numSortBuffers);
// allocate the sort buffers and fill empty queue with them
final Iterator<MemorySegment> segments =
this.sortReadMemory.iterator();
@@ -341,20 +376,11 @@ protected UnilateralSortMerger(MemoryManager
memoryManager, List<MemorySegment>
sortSegments.add(segments.next());
}
- final TypeComparator<E> comp = comparator.duplicate();
- final InMemorySorter<E> buffer;
-
- // instantiate a fix-length in-place sorter, if
possible, otherwise the out-of-place sorter
- if (comp.supportsSerializationWithKeyNormalization() &&
- serializer.getLength() > 0 &&
serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
- {
- buffer = new
FixedLengthRecordSorter<E>(serializerFactory.getSerializer(), comp,
sortSegments);
- } else {
- buffer = new
NormalizedKeySorter<E>(serializerFactory.getSerializer(), comp, sortSegments);
- }
+ final InMemorySorter<E> inMemorySorter =
inMemorySorterFactory.create(sortSegments);
+ inMemorySorters.add(inMemorySorter);
// add to empty queue
- CircularElement<E> element = new CircularElement<E>(i,
buffer, sortSegments);
+ CircularElement<E> element = new CircularElement<E>(i,
inMemorySorter, sortSegments);
circularQueues.empty.add(element);
}
@@ -494,7 +520,12 @@ public void close() {
}
}
finally {
-
+
+ // Dispose all in memory sorter in order to clear
memory references
+ for (InMemorySorter<?> inMemorySorter :
inMemorySorters) {
+ inMemorySorter.dispose();
+ }
+
// RELEASE ALL MEMORY. If the threads and channels are
still running, this should cause
// exceptions, because their memory segments are freed
try {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 60b2ed8fee7..92ae1676ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -78,6 +78,7 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -243,7 +244,9 @@
/** atomic flag that makes sure the invokable is canceled exactly once
upon error. */
private final AtomicBoolean invokableHasBeenCanceled;
- /** The invokable of this task, if initialized. */
+ /** The invokable of this task, if initialized. All accesses must copy
the reference and
+ * check for null, as this field is cleared as part of the disposal
logic. */
+ @Nullable
private volatile AbstractInvokable invokable;
/** The current execution state of the task. */
@@ -473,6 +476,12 @@ long getTaskCancellationTimeout() {
return taskCancellationTimeout;
}
+ @Nullable
+ @VisibleForTesting
+ AbstractInvokable getInvokable() {
+ return invokable;
+ }
+
//
------------------------------------------------------------------------
// Task Execution
//
------------------------------------------------------------------------
@@ -762,7 +771,7 @@ else if (current == ExecutionState.CANCELING) {
if (current == ExecutionState.RUNNING
|| current == ExecutionState.DEPLOYING) {
if (t instanceof
CancelTaskException) {
if
(transitionState(current, ExecutionState.CANCELED)) {
-
cancelInvokable();
+
cancelInvokable(invokable);
notifyObservers(ExecutionState.CANCELED, null);
break;
@@ -773,7 +782,7 @@ else if (current == ExecutionState.CANCELING) {
// proper
failure of the task. record the exception as the root cause
String
errorMessage = String.format("Execution of %s (%s) failed.",
taskNameWithSubtask, executionId);
failureCause =
t;
-
cancelInvokable();
+
cancelInvokable(invokable);
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
break;
@@ -808,6 +817,10 @@ else if (transitionState(current, ExecutionState.FAILED,
t)) {
try {
LOG.info("Freeing task resources for {} ({}).",
taskNameWithSubtask, executionId);
+ // clear the reference to the invokable. this
helps guard against holding references
+ // to the invokable and its structures in cases
where this Task object is still referenced
+ this.invokable = null;
+
// stop the async dispatcher.
// copy dispatcher reference to stack, against
concurrent release
ExecutorService dispatcher =
this.asyncCallDispatcher;
@@ -924,18 +937,20 @@ private boolean transitionState(ExecutionState
currentState, ExecutionState newS
* @throws IllegalStateException if the {@link Task} is not yet running
*/
public void stopExecution() {
+ // copy reference to stack, to guard against concurrent setting
to null
+ final AbstractInvokable invokable = this.invokable;
+
if (invokable != null) {
- LOG.info("Attempting to stop task {} ({}).",
taskNameWithSubtask, executionId);
if (invokable instanceof StoppableTask) {
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- ((StoppableTask)
invokable).stop();
- } catch (RuntimeException e) {
- LOG.error("Stopping
task {} ({}) failed.", taskNameWithSubtask, executionId, e);
-
taskManagerActions.failTask(executionId, e);
- }
+ LOG.info("Attempting to stop task {} ({}).",
taskNameWithSubtask, executionId);
+ final StoppableTask stoppable = (StoppableTask)
invokable;
+
+ Runnable runnable = () -> {
+ try {
+ stoppable.stop();
+ } catch (Throwable t) {
+ LOG.error("Stopping task {}
({}) failed.", taskNameWithSubtask, executionId, t);
+
taskManagerActions.failTask(executionId, t);
}
};
executeAsyncCallRunnable(runnable,
String.format("Stopping source task %s (%s).", taskNameWithSubtask,
executionId));
@@ -945,7 +960,7 @@ public void run() {
} else {
throw new IllegalStateException(
String.format(
- "Cannot stop task %s (%s) because it is
not yet running.",
+ "Cannot stop task %s (%s) because it is
not running.",
taskNameWithSubtask,
executionId));
}
@@ -1010,6 +1025,10 @@ else if (current == ExecutionState.RUNNING) {
if (transitionState(ExecutionState.RUNNING,
targetState, cause)) {
// we are canceling / failing out of
the running state
// we need to cancel the invokable
+
+ // copy reference to guard against
concurrent null-ing out the reference
+ final AbstractInvokable invokable =
this.invokable;
+
if (invokable != null &&
invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
notifyObservers(
@@ -1363,9 +1382,9 @@ private void executeAsyncCallRunnable(Runnable runnable,
String callName) {
// Utilities
//
------------------------------------------------------------------------
- private void cancelInvokable() {
+ private void cancelInvokable(AbstractInvokable invokable) {
// in case of an exception during execution, we still call
"cancel()" on the task
- if (invokable != null && this.invokable != null &&
invokableHasBeenCanceled.compareAndSet(false, true)) {
+ if (invokable != null &&
invokableHasBeenCanceled.compareAndSet(false, true)) {
try {
invokable.cancel();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index fa675f69e24..d32cad05cdd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -18,21 +18,11 @@
package org.apache.flink.runtime.operators.sort;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -47,9 +37,20 @@
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
public class CombiningUnilateralSortMergerITCase extends TestLogger {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
new file mode 100644
index 00000000000..f5a56bce4e1
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/UnilateralSortMergerTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link UnilateralSortMerger}.
+ */
+public class UnilateralSortMergerTest extends TestLogger {
+
+ @Test
+ public void testInMemorySorterDisposal() throws Exception {
+ final TestingInMemorySorterFactory<Tuple2<Integer, Integer>>
inMemorySorterFactory = new TestingInMemorySorterFactory<>();
+
+ final int numPages = 32;
+ final MemoryManager memoryManager = new
MemoryManager(MemoryManager.DEFAULT_PAGE_SIZE * numPages, 1);
+ final DummyInvokable parentTask = new DummyInvokable();
+ final UnilateralSortMerger<Tuple2<Integer, Integer>>
unilateralSortMerger = new UnilateralSortMerger<>(
+ memoryManager,
+ memoryManager.allocatePages(parentTask, numPages),
+ new IOManagerAsync(),
+ EmptyMutableObjectIterator.get(),
+ parentTask,
+ TestData.getIntIntTupleSerializerFactory(),
+ TestData.getIntIntTupleComparator(),
+ 10,
+ 2,
+ 1.0f,
+ true,
+ false,
+ false,
+ inMemorySorterFactory);
+
+ final Collection<TestingInMemorySorter<?>> inMemorySorters =
inMemorySorterFactory.getInMemorySorters();
+
+ assertThat(inMemorySorters, is(not(empty())));
+
+ unilateralSortMerger.close();
+
+ assertThat(unilateralSortMerger.closed, is(true));
+
+ for (TestingInMemorySorter<?> inMemorySorter : inMemorySorters)
{
+ assertThat(inMemorySorter.isDisposed(), is(true));
+ }
+ }
+
+ private static final class TestingInMemorySorterFactory<T> implements
InMemorySorterFactory<T> {
+
+ private final Collection<TestingInMemorySorter<?>>
inMemorySorters = new ArrayList<>(10);
+
+ Collection<TestingInMemorySorter<?>> getInMemorySorters() {
+ return inMemorySorters;
+ }
+
+ @Override
+ public InMemorySorter<T> create(List<MemorySegment>
sortSegments) {
+ final TestingInMemorySorter<T> testingInMemorySorter =
new TestingInMemorySorter<>();
+ inMemorySorters.add(testingInMemorySorter);
+ return testingInMemorySorter;
+ }
+ }
+
+ private static final class TestingInMemorySorter<T> implements
InMemorySorter<T> {
+
+ private volatile boolean isDisposed;
+
+ public boolean isDisposed() {
+ return isDisposed;
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return false;
+ }
+
+ @Override
+ public void dispose() {
+ isDisposed = true;
+ }
+
+ @Override
+ public long getCapacity() {
+ return 0;
+ }
+
+ @Override
+ public long getOccupancy() {
+ return 0;
+ }
+
+ @Override
+ public T getRecord(int logicalPosition) throws IOException {
+ return null;
+ }
+
+ @Override
+ public T getRecord(T reuse, int logicalPosition) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public boolean write(T record) throws IOException {
+ return false;
+ }
+
+ @Override
+ public MutableObjectIterator<T> getIterator() {
+ return null;
+ }
+
+ @Override
+ public void writeToOutput(ChannelWriterOutputView output)
throws IOException {
+
+ }
+
+ @Override
+ public void writeToOutput(ChannelWriterOutputView output,
LargeRecordHandler<T> largeRecordsOutput) throws IOException {
+
+ }
+
+ @Override
+ public void writeToOutput(ChannelWriterOutputView output, int
start, int num) throws IOException {
+
+ }
+
+ @Override
+ public int compare(int i, int j) {
+ return 0;
+ }
+
+ @Override
+ public int compare(int segmentNumberI, int segmentOffsetI, int
segmentNumberJ, int segmentOffsetJ) {
+ return 0;
+ }
+
+ @Override
+ public void swap(int i, int j) {
+
+ }
+
+ @Override
+ public void swap(int segmentNumberI, int segmentOffsetI, int
segmentNumberJ, int segmentOffsetJ) {
+
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public int recordSize() {
+ return 0;
+ }
+
+ @Override
+ public int recordsPerSegment() {
+ return 0;
+ }
+ }
+
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 1829e977489..3dfcfb3b059 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -174,6 +174,7 @@ public void testRegularExecution() {
assertEquals(ExecutionState.FINISHED,
task.getExecutionState());
assertFalse(task.isCanceledOrFailed());
assertNull(task.getFailureCause());
+ assertNull(task.getInvokable());
// verify listener messages
validateListenerMessage(ExecutionState.RUNNING, task,
false);
@@ -202,6 +203,8 @@ public void testCancelRightAway() {
// verify final state
assertEquals(ExecutionState.CANCELED,
task.getExecutionState());
validateUnregisterTask(task.getExecutionId());
+
+ assertNull(task.getInvokable());
}
catch (Exception e) {
e.printStackTrace();
@@ -258,6 +261,8 @@ public void testLibraryCacheRegistrationFailed() {
// make sure that the TaskManager received an message
to unregister the task
validateUnregisterTask(task.getExecutionId());
+
+ assertNull(task.getInvokable());
}
catch (Exception e) {
e.printStackTrace();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Unreasonable memory requirements to complete examples/batch/WordCount
> ---------------------------------------------------------------------
>
> Key: FLINK-9969
> URL: https://issues.apache.org/jira/browse/FLINK-9969
> Project: Flink
> Issue Type: Bug
> Components: ResourceManager
> Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0
> Reporter: Piotr Nowojski
> Assignee: Till Rohrmann
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
> Attachments: yarn_logs
>
>
> setup on AWS EMR:
> * 5 worker nodes (m4.4xlarge nodes)
> * 1 master node (m4.large)
> following command fails with out of memory errors:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000
> examples/batch/WordCount.jar{noformat}
> Only increasing memory over 17.2GB example completes. At the same time after
> disabling flip6 following command succeeds:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000
> examples/batch/WordCount.jar{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)