[hotfix] Fix access to temp file directories in SpillingAdaptiveSpanningRecordDeserializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf256c7f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf256c7f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf256c7f Branch: refs/heads/master Commit: bf256c7fbe05accdadc8470013879f567341d1aa Parents: 5a7f4e3 Author: Stephan Ewen <se...@apache.org> Authored: Wed May 25 17:04:31 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu May 26 21:15:20 2016 +0200 ---------------------------------------------------------------------- .../api/common/functions/RuntimeContext.java | 2 +- .../org/apache/flink/metrics/MetricGroup.java | 6 +-- .../clusterframework/types/ResourceID.java | 34 +++++++-------- .../runtime/io/disk/iomanager/IOManager.java | 15 ++++++- .../api/reader/AbstractRecordReader.java | 12 +++++- .../network/api/reader/MutableRecordReader.java | 12 +++++- .../io/network/api/reader/RecordReader.java | 17 +++++--- ...llingAdaptiveSpanningRecordDeserializer.java | 16 ++----- .../task/IterationSynchronizationSinkTask.java | 6 ++- .../flink/runtime/operators/BatchTask.java | 18 +++++--- .../flink/runtime/operators/DataSinkTask.java | 8 +++- .../taskmanager/TaskManagerRuntimeInfo.java | 35 ++++++++++++++-- .../flink/runtime/taskmanager/TaskManager.scala | 3 +- .../SpanningRecordSerializationTest.java | 4 +- .../network/serialization/LargeRecordsTest.java | 5 ++- .../SlotCountExceedingParallelismTest.java | 5 ++- .../operators/drivers/TestTaskContext.java | 3 +- .../testutils/BinaryOperatorTestBase.java | 3 +- .../operators/testutils/DriverTestBase.java | 3 +- .../operators/testutils/MockEnvironment.java | 5 ++- .../testutils/UnaryOperatorTestBase.java | 3 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 2 +- .../runtime/taskmanager/TaskCancelTest.java | 10 +++-- .../flink/runtime/taskmanager/TaskTest.java | 2 +- .../apache/flink/runtime/jobmanager/Tasks.scala | 44 ++++++++++++++++---- .../runtime/io/StreamInputProcessor.java | 3 +- .../runtime/io/StreamTwoInputProcessor.java | 3 +- .../runtime/tasks/StreamMockEnvironment.java | 4 +- .../streaming/runtime/tasks/StreamTaskTest.java | 2 +- .../runtime/NetworkStackThroughputITCase.java | 20 ++++++--- 30 files changed, 212 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index ed2f613..9a04b24 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -63,7 +63,7 @@ public interface RuntimeContext { * Returns the metric group for this parallel subtask. * * @return The metric group for this parallel subtask. - */ + */ MetricGroup getMetricGroup(); /** http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java index a3832ff..6c9e044 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java @@ -21,14 +21,14 @@ package org.apache.flink.metrics; import org.apache.flink.annotation.PublicEvolving; /** - * A MetricGroup is a named container for {@link Metric Metrics} and {@link MetricGroup MetricGroups}. + * A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups. * * <p>Instances of this class can be used to register new metrics with Flink and to create a nested * hierarchy based on the group names. * * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name. * - * <p>Metrics groups can be {@link #close() closed}. Upon closing, they de-register all metrics + * <p>Metrics groups can be {@link #close() closed}. Upon closing, the group de-register all metrics * from any metrics reporter and any internal maps. Note that even closed metrics groups * return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code. * These metrics simply do not get reported any more, when created on a closed group. @@ -39,7 +39,7 @@ public interface MetricGroup { // ------------------------------------------------------------------------ // Closing // ------------------------------------------------------------------------ - + /** * Marks the group as closed. * Recursively unregisters all {@link Metric Metrics} contained in this group. http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java index e599456..9d82c76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java @@ -45,23 +45,11 @@ public class ResourceID implements Serializable { return resourceId; } - /** - * Generate a random resource id. - * @return A random resource id. - */ - public static ResourceID generate() { - return new ResourceID(new AbstractID().toString()); - } - @Override public final boolean equals(Object o) { - if (this == o) { - return true; - } else if (o == null || !(o instanceof ResourceID)) { - return false; - } else { - return resourceId.equals(((ResourceID) o).resourceId); - } + return this == o || + (o != null && o.getClass() == ResourceID.class && + this.resourceId.equals(((ResourceID) o).resourceId)); } @Override @@ -71,8 +59,18 @@ public class ResourceID implements Serializable { @Override public String toString() { - return "ResourceID{" + - "resourceId='" + resourceId + '\'' + - '}'; + return "ResourceID (" + resourceId + ')'; + } + + // ------------------------------------------------------------------------ + // factory + // ------------------------------------------------------------------------ + + /** + * Generate a random resource id. + * @return A random resource id. + */ + public static ResourceID generate() { + return new ResourceID(new AbstractID().toString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 0942f72..7904cc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager; import org.apache.commons.io.FileUtils; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -281,7 +282,19 @@ public abstract class IOManager { public File[] getSpillingDirectories() { return this.paths; } - + + /** + * Gets the directories that the I/O manager spills to, as path strings. + * + * @return The directories that the I/O manager spills to, as path strings. + */ + public String[] getSpillingDirectoriesPaths() { + String[] strings = new String[this.paths.length]; + for (int i = 0; i < strings.length; i++) { + strings[i] = paths[i].getAbsolutePath(); + } + return strings; + } protected int getNextPathNum() { final int next = this.nextPath; http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index a784f54..48ac558 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -45,14 +45,22 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra private boolean isFinished; + /** + * Creates a new AbstractRecordReader that de-serializes records from the given input gate and + * can spill partial records to disk, if they grow large. + * + * @param inputGate The input gate to read from. + * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently + * reconstructs multiple large records. + */ @SuppressWarnings("unchecked") - protected AbstractRecordReader(InputGate inputGate) { + protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) { super(inputGate); // Initialize one deserializer per input channel this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; for (int i = 0; i < recordDeserializers.length; i++) { - recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(); + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(tmpDirectories); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java index d7cc7e9..9836ba4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java @@ -25,8 +25,16 @@ import java.io.IOException; public class MutableRecordReader<T extends IOReadableWritable> extends AbstractRecordReader<T> implements MutableReader<T> { - public MutableRecordReader(InputGate inputGate) { - super(inputGate); + /** + * Creates a new MutableRecordReader that de-serializes records from the given input gate and + * can spill partial records to disk, if they grow large. + * + * @param inputGate The input gate to read from. + * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently + * reconstructs multiple large records. + */ + public MutableRecordReader(InputGate inputGate, String[] tmpDirectories) { + super(inputGate, tmpDirectories); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java index d45920e..9eed374 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java @@ -29,8 +29,16 @@ public class RecordReader<T extends IOReadableWritable> extends AbstractRecordRe private T currentRecord; - public RecordReader(InputGate inputGate, Class<T> recordType) { - super(inputGate); + /** + * Creates a new RecordReader that de-serializes records from the given input gate and + * can spill partial records to disk, if they grow large. + * + * @param inputGate The input gate to read from. + * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently + * reconstructs multiple large records. + */ + public RecordReader(InputGate inputGate, Class<T> recordType, String[] tmpDirectories) { + super(inputGate, tmpDirectories); this.recordType = recordType; } @@ -73,10 +81,7 @@ public class RecordReader<T extends IOReadableWritable> extends AbstractRecordRe try { return recordType.newInstance(); } - catch (InstantiationException e) { - throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e); - } - catch (IllegalAccessException e) { + catch (InstantiationException | IllegalAccessException e) { throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 49f7584..7e96390 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network.api.serialization; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -65,18 +63,12 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit private AccumulatorRegistry.Reporter reporter; - private transient Counter numRecordsIn; - private transient Counter numBytesIn; + private Counter numRecordsIn; + private Counter numBytesIn; - public SpillingAdaptiveSpanningRecordDeserializer() { - - String tempDirString = GlobalConfiguration.getString( - ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH); - String[] directories = tempDirString.split(",|" + File.pathSeparator); - + public SpillingAdaptiveSpanningRecordDeserializer(String[] tmpDirectories) { this.nonSpanningWrapper = new NonSpanningWrapper(); - this.spanningWrapper = new SpanningWrapper(directories); + this.spanningWrapper = new SpanningWrapper(tmpDirectories); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java index 2b710d2..f1ab93f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java @@ -72,7 +72,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen @Override public void invoke() throws Exception { - this.headEventReader = new MutableRecordReader<IntValue>(getEnvironment().getInputGate(0)); + this.headEventReader = new MutableRecordReader<IntValue>( + getEnvironment().getInputGate(0), + getEnvironment().getTaskManagerInfo().getTmpDirectories()); TaskConfig taskConfig = new TaskConfig(getTaskConfiguration()); @@ -184,7 +186,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen // read (and thereby process all events in the handler's event handling functions) try { - while (this.headEventReader.next(rec)) { + if (this.headEventReader.next(rec)) { throw new RuntimeException("Synchronization task must not see any records!"); } } catch (InterruptedException iex) { http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index 546193c..f38b988 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -659,14 +659,18 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme if (groupSize == 1) { // non-union case - inputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset)); + inputReaders[i] = new MutableRecordReader<IOReadableWritable>( + getEnvironment().getInputGate(currentReaderOffset), + getEnvironment().getTaskManagerInfo().getTmpDirectories()); } else if (groupSize > 1){ // union case InputGate[] readers = new InputGate[groupSize]; for (int j = 0; j < groupSize; ++j) { readers[j] = getEnvironment().getInputGate(currentReaderOffset + j); } - inputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers)); + inputReaders[i] = new MutableRecordReader<IOReadableWritable>( + new UnionInputGate(readers), + getEnvironment().getTaskManagerInfo().getTmpDirectories()); } else { throw new Exception("Illegal input group size in task configuration: " + groupSize); } @@ -701,14 +705,18 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme final int groupSize = this.config.getBroadcastGroupSize(i); if (groupSize == 1) { // non-union case - broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset)); + broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>( + getEnvironment().getInputGate(currentReaderOffset), + getEnvironment().getTaskManagerInfo().getTmpDirectories()); } else if (groupSize > 1){ // union case InputGate[] readers = new InputGate[groupSize]; for (int j = 0; j < groupSize; ++j) { readers[j] = getEnvironment().getInputGate(currentReaderOffset + j); } - broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers)); + broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>( + new UnionInputGate(readers), + getEnvironment().getTaskManagerInfo().getTmpDirectories()); } else { throw new Exception("Illegal input group size in task configuration: " + groupSize); } @@ -765,8 +773,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme * * NOTE: This method must be invoked after the invocation of {@code #initInputReaders()} and * {@code #initInputSerializersAndComparators(int)}! - * - * @param numInputs */ protected void initLocalStrategies(int numInputs) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 39bf23f..380edd4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -332,10 +332,14 @@ public class DataSinkTask<IT> extends AbstractInvokable { numGates += groupSize; if (groupSize == 1) { // non-union case - inputReader = new MutableRecordReader<DeserializationDelegate<IT>>(getEnvironment().getInputGate(0)); + inputReader = new MutableRecordReader<DeserializationDelegate<IT>>( + getEnvironment().getInputGate(0), + getEnvironment().getTaskManagerInfo().getTmpDirectories()); } else if (groupSize > 1){ // union case - inputReader = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(getEnvironment().getAllInputGates())); + inputReader = new MutableRecordReader<IOReadableWritable>( + new UnionInputGate(getEnvironment().getAllInputGates()), + getEnvironment().getTaskManagerInfo().getTmpDirectories()); } else { throw new Exception("Illegal input group size in task configuration: " + groupSize); } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java index 8d06f10..9ac982e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java @@ -20,6 +20,9 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.configuration.Configuration; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Encapsulation of TaskManager runtime information, like hostname and configuration. */ @@ -33,14 +36,32 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable { /** configuration that the TaskManager was started with */ private final Configuration configuration; + /** list of temporary file directories */ + private final String[] tmpDirectories; + /** * Creates a runtime info. + * * @param hostname The host name of the interface that the TaskManager uses to communicate. * @param configuration The configuration that the TaskManager was started with. + * @param tmpDirectory The temporary file directory. */ - public TaskManagerRuntimeInfo(String hostname, Configuration configuration) { - this.hostname = hostname; - this.configuration = configuration; + public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String tmpDirectory) { + this(hostname, configuration, new String[] { tmpDirectory }); + } + + /** + * Creates a runtime info. + * @param hostname The host name of the interface that the TaskManager uses to communicate. + * @param configuration The configuration that the TaskManager was started with. + * @param tmpDirectories The list of temporary file directories. + */ + public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories) { + checkArgument(tmpDirectories.length > 0); + this.hostname = checkNotNull(hostname); + this.configuration = checkNotNull(configuration); + this.tmpDirectories = tmpDirectories; + } /** @@ -58,4 +79,12 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable { public Configuration getConfiguration() { return configuration; } + + /** + * Gets the list of temporary file directories. + * @return The list of temporary file directories. + */ + public String[] getTmpDirectories() { + return tmpDirectories; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a5cc18d..eb7a0ef 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -187,7 +187,8 @@ class TaskManager( private val runtimeInfo = new TaskManagerRuntimeInfo( connectionInfo.getHostname(), - new UnmodifiableConfiguration(config.configuration)) + new UnmodifiableConfiguration(config.configuration), + config.tmpDirPaths) // -------------------------------------------------------------------------- // Actor messages and life cycle // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index 819a94f..9d0ee67 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -106,7 +106,9 @@ public class SpanningRecordSerializationTest { private void testSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception { RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); - RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(); + RecordDeserializer<SerializationTestType> deserializer = + new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>( + new String[] { System.getProperty("java.io.tmpdir") }); test(records, segmentSize, serializer, deserializer); } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java index d628596..1574fe9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java @@ -147,7 +147,10 @@ public class LargeRecordsTest { final int SEGMENT_SIZE = 32 * 1024; final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); - final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(); + + final RecordDeserializer<SerializationTestType> deserializer = + new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>( + new String[] { System.getProperty("java.io.tmpdir") } ); final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index 561bda3..e12faf9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmanager; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.io.network.api.reader.RecordReader; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; @@ -30,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.types.IntValue; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -152,7 +152,8 @@ public class SlotCountExceedingParallelismTest { public void invoke() throws Exception { RecordReader<IntValue> reader = new RecordReader<>( getEnvironment().getInputGate(0), - IntValue.class); + IntValue.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); try { final int numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index 0300a07..15ad353 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -74,7 +74,8 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> { public TestTaskContext(long memoryInBytes) { this.memoryManager = new MemoryManager(memoryInBytes, 1, 32 * 1024, MemoryType.HEAP, true); - this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration()); + this.taskManageInfo = new TaskManagerRuntimeInfo( + "localhost", new Configuration(), System.getProperty("java.io.tmpdir")); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 2c3dcf1..6e9b817 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -110,7 +110,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog this.owner = new DummyInvokable(); this.taskConfig = new TaskConfig(new Configuration()); this.executionConfig = executionConfig; - this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration()); + this.taskManageInfo = new TaskManagerRuntimeInfo( + "localhost", new Configuration(), System.getProperty("java.io.tmpdir")); } @Parameterized.Parameters http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index 6381733..eb2a3a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -114,7 +114,8 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Ta this.owner = new DummyInvokable(); this.taskConfig = new TaskConfig(new Configuration()); this.executionConfig = executionConfig; - this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration()); + this.taskManageInfo = new TaskManagerRuntimeInfo( + "localhost", new Configuration(), System.getProperty("java.io.tmpdir")); } @Parameterized.Parameters http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 31fd08c..b774b48 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -208,7 +208,10 @@ public class MockEnvironment implements Environment { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { - return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration())); + return new TaskManagerRuntimeInfo( + "localhost", + new UnmodifiableConfiguration(new Configuration()), + System.getProperty("java.io.tmpdir")); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index 77e18c6..50bb1ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -115,7 +115,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg this.executionConfig = executionConfig; this.comparators = new ArrayList<TypeComparator<IN>>(2); - this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration()); + this.taskManageInfo = new TaskManagerRuntimeInfo( + "localhost", new Configuration(), System.getProperty("java.io.tmpdir")); } @Parameterized.Parameters http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index ae05ae9..e1f551c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -171,7 +171,7 @@ public class TaskAsyncCallTest { new FiniteDuration(60, TimeUnit.SECONDS), libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration()), + new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), mock(TaskMetricGroup.class)); } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java index 09dd817..fc5a4a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -41,7 +40,9 @@ import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; import org.apache.flink.types.IntValue; + import org.junit.Test; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -247,10 +248,11 @@ public class TaskCancelTest { @Override public void invoke() throws Exception { UnionInputGate union = new UnionInputGate(getEnvironment().getAllInputGates()); - RecordReader<IntValue> reader = new RecordReader<>(union, IntValue.class); + RecordReader<IntValue> reader = new RecordReader<>( + union, IntValue.class, getEnvironment().getTaskManagerInfo().getTmpDirectories()); - while (reader.next() != null) { - } + //noinspection StatementWithEmptyBody + while (reader.next() != null) {} } } } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index f237c87..1762a7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -623,7 +623,7 @@ public class TaskTest { new FiniteDuration(60, TimeUnit.SECONDS), libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration()), + new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), mock(TaskMetricGroup.class)); } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index d871c3d..87c123a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -64,7 +64,11 @@ object Tasks { class Forwarder extends AbstractInvokable { override def invoke(): Unit = { - val reader = new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue]) + val reader = new RecordReader[IntValue]( + getEnvironment.getInputGate(0), + classOf[IntValue], + getEnvironment.getTaskManagerInfo.getTmpDirectories) + val writer = new RecordWriter[IntValue](getEnvironment.getWriter(0)) try { @@ -88,7 +92,10 @@ object Tasks { class Receiver extends AbstractInvokable { override def invoke(): Unit = { - val reader = new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue]) + val reader = new RecordReader[IntValue]( + getEnvironment.getInputGate(0), + classOf[IntValue], + getEnvironment.getTaskManagerInfo.getTmpDirectories) val i1 = reader.next() val i2 = reader.next() @@ -140,7 +147,10 @@ object Tasks { class AgnosticReceiver extends AbstractInvokable { override def invoke(): Unit = { - val reader= new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue]) + val reader= new RecordReader[IntValue]( + getEnvironment.getInputGate(0), + classOf[IntValue], + getEnvironment.getTaskManagerInfo.getTmpDirectories) while(reader.next() != null){} } @@ -149,8 +159,15 @@ object Tasks { class AgnosticBinaryReceiver extends AbstractInvokable { override def invoke(): Unit = { - val reader1 = new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue]) - val reader2 = new RecordReader[IntValue](getEnvironment.getInputGate(1), classOf[IntValue]) + val reader1 = new RecordReader[IntValue]( + getEnvironment.getInputGate(0), + classOf[IntValue], + getEnvironment.getTaskManagerInfo.getTmpDirectories) + + val reader2 = new RecordReader[IntValue]( + getEnvironment.getInputGate(1), + classOf[IntValue], + getEnvironment.getTaskManagerInfo.getTmpDirectories) while(reader1.next() != null){} while(reader2.next() != null){} @@ -162,9 +179,20 @@ object Tasks { override def invoke(): Unit = { val env = getEnvironment - val reader1 = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue]) - val reader2 = new RecordReader[IntValue](env.getInputGate(1), classOf[IntValue]) - val reader3 = new RecordReader[IntValue](env.getInputGate(2), classOf[IntValue]) + val reader1 = new RecordReader[IntValue]( + env.getInputGate(0), + classOf[IntValue], + getEnvironment.getTaskManagerInfo.getTmpDirectories) + + val reader2 = new RecordReader[IntValue]( + env.getInputGate(1), + classOf[IntValue], + getEnvironment.getTaskManagerInfo.getTmpDirectories) + + val reader3 = new RecordReader[IntValue]( + env.getInputGate(2), + classOf[IntValue], + getEnvironment.getTaskManagerInfo.getTmpDirectories) while(reader1.next() != null){} while(reader2.next() != null){} http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 1dde85b..ab69ab7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -117,7 +117,8 @@ public class StreamInputProcessor<IN> { this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; for (int i = 0; i < recordDeserializers.length; i++) { - recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>(); + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>( + ioManager.getSpillingDirectoriesPaths()); } watermarks = new long[inputGate.getNumberOfInputChannels()]; http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 07ada23..733e7fb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -139,7 +139,8 @@ public class StreamTwoInputProcessor<IN1, IN2> { this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; for (int i = 0; i < recordDeserializers.length; i++) { - recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>(); + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>( + ioManager.getSpillingDirectoriesPaths()); } // determine which unioned channels belong to input 1 and which belong to input 2 http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index c62c881..b2d0196 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.metrics.groups.TaskMetricGroup; @@ -50,6 +49,7 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -303,7 +303,7 @@ public class StreamMockEnvironment implements Environment { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { - return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration())); + return new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index deda82f..f054e18 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -158,7 +158,7 @@ public class StreamTaskTest { new FiniteDuration(60, TimeUnit.SECONDS), libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration()), + new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), mock(TaskMetricGroup.class)); } http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index 06df46f..5506f55 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -18,27 +18,28 @@ package org.apache.flink.test.runtime; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.io.network.api.reader.RecordReader; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.test.util.JavaProgramTestBase; + import org.junit.Ignore; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.TimeUnit; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.test.util.JavaProgramTestBase; @Ignore public class NetworkStackThroughputITCase { @@ -195,7 +196,11 @@ public class NetworkStackThroughputITCase { @Override public void invoke() throws Exception { - RecordReader<SpeedTestRecord> reader = new RecordReader<>(getEnvironment().getInputGate(0), SpeedTestRecord.class); + RecordReader<SpeedTestRecord> reader = new RecordReader<>( + getEnvironment().getInputGate(0), + SpeedTestRecord.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0)); try { @@ -215,7 +220,10 @@ public class NetworkStackThroughputITCase { @Override public void invoke() throws Exception { - RecordReader<SpeedTestRecord> reader = new RecordReader<>(getEnvironment().getInputGate(0), SpeedTestRecord.class); + RecordReader<SpeedTestRecord> reader = new RecordReader<>( + getEnvironment().getInputGate(0), + SpeedTestRecord.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); try { boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);