[hotfix] Fix access to temp file directories in 
SpillingAdaptiveSpanningRecordDeserializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf256c7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf256c7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf256c7f

Branch: refs/heads/master
Commit: bf256c7fbe05accdadc8470013879f567341d1aa
Parents: 5a7f4e3
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 25 17:04:31 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 26 21:15:20 2016 +0200

----------------------------------------------------------------------
 .../api/common/functions/RuntimeContext.java    |  2 +-
 .../org/apache/flink/metrics/MetricGroup.java   |  6 +--
 .../clusterframework/types/ResourceID.java      | 34 +++++++--------
 .../runtime/io/disk/iomanager/IOManager.java    | 15 ++++++-
 .../api/reader/AbstractRecordReader.java        | 12 +++++-
 .../network/api/reader/MutableRecordReader.java | 12 +++++-
 .../io/network/api/reader/RecordReader.java     | 17 +++++---
 ...llingAdaptiveSpanningRecordDeserializer.java | 16 ++-----
 .../task/IterationSynchronizationSinkTask.java  |  6 ++-
 .../flink/runtime/operators/BatchTask.java      | 18 +++++---
 .../flink/runtime/operators/DataSinkTask.java   |  8 +++-
 .../taskmanager/TaskManagerRuntimeInfo.java     | 35 ++++++++++++++--
 .../flink/runtime/taskmanager/TaskManager.scala |  3 +-
 .../SpanningRecordSerializationTest.java        |  4 +-
 .../network/serialization/LargeRecordsTest.java |  5 ++-
 .../SlotCountExceedingParallelismTest.java      |  5 ++-
 .../operators/drivers/TestTaskContext.java      |  3 +-
 .../testutils/BinaryOperatorTestBase.java       |  3 +-
 .../operators/testutils/DriverTestBase.java     |  3 +-
 .../operators/testutils/MockEnvironment.java    |  5 ++-
 .../testutils/UnaryOperatorTestBase.java        |  3 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  2 +-
 .../runtime/taskmanager/TaskCancelTest.java     | 10 +++--
 .../flink/runtime/taskmanager/TaskTest.java     |  2 +-
 .../apache/flink/runtime/jobmanager/Tasks.scala | 44 ++++++++++++++++----
 .../runtime/io/StreamInputProcessor.java        |  3 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  3 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  2 +-
 .../runtime/NetworkStackThroughputITCase.java   | 20 ++++++---
 30 files changed, 212 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index ed2f613..9a04b24 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -63,7 +63,7 @@ public interface RuntimeContext {
         * Returns the metric group for this parallel subtask.
         * 
         * @return The metric group for this parallel subtask.
-     */
+        */
        MetricGroup getMetricGroup();
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index a3832ff..6c9e044 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -21,14 +21,14 @@ package org.apache.flink.metrics;
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * A MetricGroup is a named container for {@link Metric Metrics} and {@link 
MetricGroup MetricGroups}.
+ * A MetricGroup is a named container for {@link Metric Metrics} and further 
metric subgroups.
  * 
  * <p>Instances of this class can be used to register new metrics with Flink 
and to create a nested
  * hierarchy based on the group names.
  * 
  * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and 
name.
  * 
- * <p>Metrics groups can be {@link #close() closed}. Upon closing, they 
de-register all metrics
+ * <p>Metrics groups can be {@link #close() closed}. Upon closing, the group 
de-register all metrics
  * from any metrics reporter and any internal maps. Note that even closed 
metrics groups
  * return Counters, Gauges, etc to the code, to prevent exceptions in the 
monitored code.
  * These metrics simply do not get reported any more, when created on a closed 
group.
@@ -39,7 +39,7 @@ public interface MetricGroup {
        // 
------------------------------------------------------------------------
        //  Closing
        // 
------------------------------------------------------------------------
-       
+
        /**
         * Marks the group as closed.
         * Recursively unregisters all {@link Metric Metrics} contained in this 
group.

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
index e599456..9d82c76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
@@ -45,23 +45,11 @@ public class ResourceID implements Serializable {
                return resourceId;
        }
 
-       /**
-        * Generate a random resource id.
-        * @return A random resource id.
-        */
-       public static ResourceID generate() {
-               return new ResourceID(new AbstractID().toString());
-       }
-
        @Override
        public final boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               } else if (o == null || !(o instanceof ResourceID)) {
-                       return false;
-               } else {
-                       return resourceId.equals(((ResourceID) o).resourceId);
-               }
+               return this == o ||
+                               (o != null && o.getClass() == ResourceID.class 
&& 
+                                       this.resourceId.equals(((ResourceID) 
o).resourceId));
        }
 
        @Override
@@ -71,8 +59,18 @@ public class ResourceID implements Serializable {
 
        @Override
        public String toString() {
-               return "ResourceID{" +
-                       "resourceId='" + resourceId + '\'' +
-                       '}';
+               return "ResourceID (" + resourceId + ')';
+       }
+
+       // 
------------------------------------------------------------------------
+       //  factory
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Generate a random resource id.
+        * @return A random resource id.
+        */
+       public static ResourceID generate() {
+               return new ResourceID(new AbstractID().toString());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 0942f72..7904cc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -281,7 +282,19 @@ public abstract class IOManager {
        public File[] getSpillingDirectories() {
                return this.paths;
        }
-       
+
+       /**
+        * Gets the directories that the I/O manager spills to, as path strings.
+        *
+        * @return The directories that the I/O manager spills to, as path 
strings.
+        */
+       public String[] getSpillingDirectoriesPaths() {
+               String[] strings = new String[this.paths.length];
+               for (int i = 0; i < strings.length; i++) {
+                       strings[i] = paths[i].getAbsolutePath();
+               }
+               return strings;
+       }
        
        protected int getNextPathNum() {
                final int next = this.nextPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index a784f54..48ac558 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -45,14 +45,22 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
 
        private boolean isFinished;
 
+       /**
+        * Creates a new AbstractRecordReader that de-serializes records from 
the given input gate and
+        * can spill partial records to disk, if they grow large.
+        *
+        * @param inputGate The input gate to read from.
+        * @param tmpDirectories The temp directories. USed for spilling if the 
reader concurrently
+        *                       reconstructs multiple large records.
+        */
        @SuppressWarnings("unchecked")
-       protected AbstractRecordReader(InputGate inputGate) {
+       protected AbstractRecordReader(InputGate inputGate, String[] 
tmpDirectories) {
                super(inputGate);
 
                // Initialize one deserializer per input channel
                this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
                for (int i = 0; i < recordDeserializers.length; i++) {
-                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>();
+                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>(tmpDirectories);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
index d7cc7e9..9836ba4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
@@ -25,8 +25,16 @@ import java.io.IOException;
 
 public class MutableRecordReader<T extends IOReadableWritable> extends 
AbstractRecordReader<T> implements MutableReader<T> {
 
-       public MutableRecordReader(InputGate inputGate) {
-               super(inputGate);
+       /**
+        * Creates a new MutableRecordReader that de-serializes records from 
the given input gate and
+        * can spill partial records to disk, if they grow large.
+        * 
+        * @param inputGate The input gate to read from.
+        * @param tmpDirectories The temp directories. USed for spilling if the 
reader concurrently
+        *                       reconstructs multiple large records.
+        */
+       public MutableRecordReader(InputGate inputGate, String[] 
tmpDirectories) {
+               super(inputGate, tmpDirectories);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
index d45920e..9eed374 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
@@ -29,8 +29,16 @@ public class RecordReader<T extends IOReadableWritable> 
extends AbstractRecordRe
 
        private T currentRecord;
 
-       public RecordReader(InputGate inputGate, Class<T> recordType) {
-               super(inputGate);
+       /**
+        * Creates a new RecordReader that de-serializes records from the given 
input gate and
+        * can spill partial records to disk, if they grow large.
+        *
+        * @param inputGate The input gate to read from.
+        * @param tmpDirectories The temp directories. USed for spilling if the 
reader concurrently
+        *                       reconstructs multiple large records.
+        */
+       public RecordReader(InputGate inputGate, Class<T> recordType, String[] 
tmpDirectories) {
+               super(inputGate, tmpDirectories);
 
                this.recordType = recordType;
        }
@@ -73,10 +81,7 @@ public class RecordReader<T extends IOReadableWritable> 
extends AbstractRecordRe
                try {
                        return recordType.newInstance();
                }
-               catch (InstantiationException e) {
-                       throw new RuntimeException("Cannot instantiate class " 
+ recordType.getName(), e);
-               }
-               catch (IllegalAccessException e) {
+               catch (InstantiationException | IllegalAccessException e) {
                        throw new RuntimeException("Cannot instantiate class " 
+ recordType.getName(), e);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 49f7584..7e96390 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -65,18 +63,12 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
 
        private AccumulatorRegistry.Reporter reporter;
 
-       private transient Counter numRecordsIn;
-       private transient Counter numBytesIn;
+       private Counter numRecordsIn;
+       private Counter numBytesIn;
 
-       public SpillingAdaptiveSpanningRecordDeserializer() {
-               
-               String tempDirString = GlobalConfiguration.getString(
-                               ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-                               ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
-               String[] directories = tempDirString.split(",|" + 
File.pathSeparator);
-               
+       public SpillingAdaptiveSpanningRecordDeserializer(String[] 
tmpDirectories) {
                this.nonSpanningWrapper = new NonSpanningWrapper();
-               this.spanningWrapper = new SpanningWrapper(directories);
+               this.spanningWrapper = new SpanningWrapper(tmpDirectories);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 2b710d2..f1ab93f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -72,7 +72,9 @@ public class IterationSynchronizationSinkTask extends 
AbstractInvokable implemen
        
        @Override
        public void invoke() throws Exception {
-               this.headEventReader = new 
MutableRecordReader<IntValue>(getEnvironment().getInputGate(0));
+               this.headEventReader = new MutableRecordReader<IntValue>(
+                               getEnvironment().getInputGate(0),
+                               
getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
                TaskConfig taskConfig = new TaskConfig(getTaskConfiguration());
                
@@ -184,7 +186,7 @@ public class IterationSynchronizationSinkTask extends 
AbstractInvokable implemen
                
                // read (and thereby process all events in the handler's event 
handling functions)
                try {
-                       while (this.headEventReader.next(rec)) {
+                       if (this.headEventReader.next(rec)) {
                                throw new RuntimeException("Synchronization 
task must not see any records!");
                        }
                } catch (InterruptedException iex) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 546193c..f38b988 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -659,14 +659,18 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
 
                        if (groupSize == 1) {
                                // non-union case
-                               inputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
+                               inputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(
+                                               
getEnvironment().getInputGate(currentReaderOffset),
+                                               
getEnvironment().getTaskManagerInfo().getTmpDirectories());
                        } else if (groupSize > 1){
                                // union case
                                InputGate[] readers = new InputGate[groupSize];
                                for (int j = 0; j < groupSize; ++j) {
                                        readers[j] = 
getEnvironment().getInputGate(currentReaderOffset + j);
                                }
-                               inputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
+                               inputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(
+                                               new UnionInputGate(readers),
+                                               
getEnvironment().getTaskManagerInfo().getTmpDirectories());
                        } else {
                                throw new Exception("Illegal input group size 
in task configuration: " + groupSize);
                        }
@@ -701,14 +705,18 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                        final int groupSize = 
this.config.getBroadcastGroupSize(i);
                        if (groupSize == 1) {
                                // non-union case
-                               broadcastInputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
+                               broadcastInputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(
+                                               
getEnvironment().getInputGate(currentReaderOffset),
+                                               
getEnvironment().getTaskManagerInfo().getTmpDirectories());
                        } else if (groupSize > 1){
                                // union case
                                InputGate[] readers = new InputGate[groupSize];
                                for (int j = 0; j < groupSize; ++j) {
                                        readers[j] = 
getEnvironment().getInputGate(currentReaderOffset + j);
                                }
-                               broadcastInputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
+                               broadcastInputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(
+                                               new UnionInputGate(readers),
+                                               
getEnvironment().getTaskManagerInfo().getTmpDirectories());
                        } else {
                                throw new Exception("Illegal input group size 
in task configuration: " + groupSize);
                        }
@@ -765,8 +773,6 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
         *
         * NOTE: This method must be invoked after the invocation of {@code 
#initInputReaders()} and
         * {@code #initInputSerializersAndComparators(int)}!
-        *
-        * @param numInputs
         */
        protected void initLocalStrategies(int numInputs) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 39bf23f..380edd4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -332,10 +332,14 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                numGates += groupSize;
                if (groupSize == 1) {
                        // non-union case
-                       inputReader = new 
MutableRecordReader<DeserializationDelegate<IT>>(getEnvironment().getInputGate(0));
+                       inputReader = new 
MutableRecordReader<DeserializationDelegate<IT>>(
+                                       getEnvironment().getInputGate(0),
+                                       
getEnvironment().getTaskManagerInfo().getTmpDirectories());
                } else if (groupSize > 1){
                        // union case
-                       inputReader = new 
MutableRecordReader<IOReadableWritable>(new 
UnionInputGate(getEnvironment().getAllInputGates()));
+                       inputReader = new 
MutableRecordReader<IOReadableWritable>(
+                                       new 
UnionInputGate(getEnvironment().getAllInputGates()),
+                                       
getEnvironment().getTaskManagerInfo().getTmpDirectories());
                } else {
                        throw new Exception("Illegal input group size in task 
configuration: " + groupSize);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index 8d06f10..9ac982e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.configuration.Configuration;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Encapsulation of TaskManager runtime information, like hostname and 
configuration.
  */
@@ -33,14 +36,32 @@ public class TaskManagerRuntimeInfo implements 
java.io.Serializable {
        /** configuration that the TaskManager was started with */
        private final Configuration configuration;
 
+       /** list of temporary file directories */
+       private final String[] tmpDirectories;
+       
        /**
         * Creates a runtime info.
+        * 
         * @param hostname The host name of the interface that the TaskManager 
uses to communicate.
         * @param configuration The configuration that the TaskManager was 
started with.
+        * @param tmpDirectory The temporary file directory.   
         */
-       public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration) {
-               this.hostname = hostname;
-               this.configuration = configuration;
+       public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration, String tmpDirectory) {
+               this(hostname, configuration, new String[] { tmpDirectory });
+       }
+       
+       /**
+        * Creates a runtime info.
+        * @param hostname The host name of the interface that the TaskManager 
uses to communicate.
+        * @param configuration The configuration that the TaskManager was 
started with.
+        * @param tmpDirectories The list of temporary file directories.   
+        */
+       public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration, String[] tmpDirectories) {
+               checkArgument(tmpDirectories.length > 0);
+               this.hostname = checkNotNull(hostname);
+               this.configuration = checkNotNull(configuration);
+               this.tmpDirectories = tmpDirectories;
+               
        }
 
        /**
@@ -58,4 +79,12 @@ public class TaskManagerRuntimeInfo implements 
java.io.Serializable {
        public Configuration getConfiguration() {
                return configuration;
        }
+
+       /**
+        * Gets the list of temporary file directories.
+        * @return The list of temporary file directories.
+        */
+       public String[] getTmpDirectories() {
+               return tmpDirectories;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a5cc18d..eb7a0ef 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -187,7 +187,8 @@ class TaskManager(
 
   private val runtimeInfo = new TaskManagerRuntimeInfo(
        connectionInfo.getHostname(),
-       new UnmodifiableConfiguration(config.configuration))
+       new UnmodifiableConfiguration(config.configuration),
+       config.tmpDirPaths)
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 819a94f..9d0ee67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -106,7 +106,9 @@ public class SpanningRecordSerializationTest {
        
        private void testSpillingDeserializer(Util.MockRecords records, int 
segmentSize) throws Exception {
                RecordSerializer<SerializationTestType> serializer = new 
SpanningRecordSerializer<SerializationTestType>();
-               RecordDeserializer<SerializationTestType> deserializer = new 
SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();
+               RecordDeserializer<SerializationTestType> deserializer = 
+                               new 
SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
+                                               new String[] { 
System.getProperty("java.io.tmpdir") });
                
                test(records, segmentSize, serializer, deserializer);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index d628596..1574fe9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -147,7 +147,10 @@ public class LargeRecordsTest {
                        final int SEGMENT_SIZE = 32 * 1024;
 
                        final RecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
-                       final RecordDeserializer<SerializationTestType> 
deserializer = new 
SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();
+                       
+                       final RecordDeserializer<SerializationTestType> 
deserializer =
+                                       new 
SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
+                                                       new String[] { 
System.getProperty("java.io.tmpdir") } );
 
                        final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), 
mock(BufferRecycler.class));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 561bda3..e12faf9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -30,6 +29,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.types.IntValue;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -152,7 +152,8 @@ public class SlotCountExceedingParallelismTest {
                public void invoke() throws Exception {
                        RecordReader<IntValue> reader = new RecordReader<>(
                                        getEnvironment().getInputGate(0),
-                                       IntValue.class);
+                                       IntValue.class,
+                                       
getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
                        try {
                                final int numberOfSubtaskIndexesToReceive = 
getTaskConfiguration().getInteger(CONFIG_KEY, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 0300a07..15ad353 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -74,7 +74,8 @@ public class TestTaskContext<S, T> implements TaskContext<S, 
T> {
        
        public TestTaskContext(long memoryInBytes) {
                this.memoryManager = new MemoryManager(memoryInBytes, 1, 32 * 
1024, MemoryType.HEAP, true);
-               this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", 
new Configuration());
+               this.taskManageInfo = new TaskManagerRuntimeInfo(
+                               "localhost", new Configuration(), 
System.getProperty("java.io.tmpdir"));
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 2c3dcf1..6e9b817 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -110,7 +110,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLog
                this.owner = new DummyInvokable();
                this.taskConfig = new TaskConfig(new Configuration());
                this.executionConfig = executionConfig;
-               this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", 
new Configuration());
+               this.taskManageInfo = new TaskManagerRuntimeInfo(
+                               "localhost", new Configuration(), 
System.getProperty("java.io.tmpdir"));
        }
        
        @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 6381733..eb2a3a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -114,7 +114,8 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Ta
                this.owner = new DummyInvokable();
                this.taskConfig = new TaskConfig(new Configuration());
                this.executionConfig = executionConfig;
-               this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", 
new Configuration());
+               this.taskManageInfo = new TaskManagerRuntimeInfo(
+                               "localhost", new Configuration(), 
System.getProperty("java.io.tmpdir"));
        }
 
        @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 31fd08c..b774b48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -208,7 +208,10 @@ public class MockEnvironment implements Environment {
 
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return new TaskManagerRuntimeInfo("localhost", new 
UnmodifiableConfiguration(new Configuration()));
+               return new TaskManagerRuntimeInfo(
+                               "localhost",
+                               new UnmodifiableConfiguration(new 
Configuration()),
+                               System.getProperty("java.io.tmpdir"));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 77e18c6..50bb1ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -115,7 +115,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLogg
                this.executionConfig = executionConfig;
                this.comparators = new ArrayList<TypeComparator<IN>>(2);
 
-               this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", 
new Configuration());
+               this.taskManageInfo = new TaskManagerRuntimeInfo(
+                               "localhost", new Configuration(), 
System.getProperty("java.io.tmpdir"));
        }
 
        @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index ae05ae9..e1f551c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -171,7 +171,7 @@ public class TaskAsyncCallTest {
                                new FiniteDuration(60, TimeUnit.SECONDS),
                                libCache,
                                mock(FileCache.class),
-                               new TaskManagerRuntimeInfo("localhost", new 
Configuration()),
+                               new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
                                mock(TaskMetricGroup.class));
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 09dd817..fc5a4a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -41,7 +40,9 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.types.IntValue;
+
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -247,10 +248,11 @@ public class TaskCancelTest {
                @Override
                public void invoke() throws Exception {
                        UnionInputGate union = new 
UnionInputGate(getEnvironment().getAllInputGates());
-                       RecordReader<IntValue> reader = new 
RecordReader<>(union, IntValue.class);
+                       RecordReader<IntValue> reader = new RecordReader<>(
+                                       union, IntValue.class, 
getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
-                       while (reader.next() != null) {
-                       }
+                       //noinspection StatementWithEmptyBody
+                       while (reader.next() != null) {}
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index f237c87..1762a7a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -623,7 +623,7 @@ public class TaskTest {
                                new FiniteDuration(60, TimeUnit.SECONDS),
                                libCache,
                                mock(FileCache.class),
-                               new TaskManagerRuntimeInfo("localhost", new 
Configuration()),
+                               new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
                                mock(TaskMetricGroup.class));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index d871c3d..87c123a 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -64,7 +64,11 @@ object Tasks {
   class Forwarder extends AbstractInvokable {
 
     override def invoke(): Unit = {
-      val reader = new RecordReader[IntValue](getEnvironment.getInputGate(0), 
classOf[IntValue])
+      val reader = new RecordReader[IntValue](
+        getEnvironment.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
+      
       val writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
 
       try {
@@ -88,7 +92,10 @@ object Tasks {
   class Receiver extends AbstractInvokable {
 
     override def invoke(): Unit = {
-      val reader = new RecordReader[IntValue](getEnvironment.getInputGate(0), 
classOf[IntValue])
+      val reader = new RecordReader[IntValue](
+        getEnvironment.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
 
       val i1 = reader.next()
       val i2 = reader.next()
@@ -140,7 +147,10 @@ object Tasks {
   class AgnosticReceiver extends AbstractInvokable {
 
     override def invoke(): Unit = {
-      val reader= new RecordReader[IntValue](getEnvironment.getInputGate(0), 
classOf[IntValue])
+      val reader= new RecordReader[IntValue](
+        getEnvironment.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
 
       while(reader.next() != null){}
     }
@@ -149,8 +159,15 @@ object Tasks {
   class AgnosticBinaryReceiver extends AbstractInvokable {
 
     override def invoke(): Unit = {
-      val reader1 = new RecordReader[IntValue](getEnvironment.getInputGate(0), 
classOf[IntValue])
-      val reader2 = new RecordReader[IntValue](getEnvironment.getInputGate(1), 
classOf[IntValue])
+      val reader1 = new RecordReader[IntValue](
+        getEnvironment.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
+      
+      val reader2 = new RecordReader[IntValue](
+        getEnvironment.getInputGate(1),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
 
       while(reader1.next() != null){}
       while(reader2.next() != null){}
@@ -162,9 +179,20 @@ object Tasks {
     override def invoke(): Unit = {
       val env = getEnvironment
 
-      val reader1 = new RecordReader[IntValue](env.getInputGate(0), 
classOf[IntValue])
-      val reader2 = new RecordReader[IntValue](env.getInputGate(1), 
classOf[IntValue])
-      val reader3 = new RecordReader[IntValue](env.getInputGate(2), 
classOf[IntValue])
+      val reader1 = new RecordReader[IntValue](
+        env.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
+      
+      val reader2 = new RecordReader[IntValue](
+        env.getInputGate(1),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
+      
+      val reader3 = new RecordReader[IntValue](
+        env.getInputGate(2),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
 
       while(reader1.next() != null){}
       while(reader2.next() != null){}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 1dde85b..ab69ab7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -117,7 +117,8 @@ public class StreamInputProcessor<IN> {
                this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
                
                for (int i = 0; i < recordDeserializers.length; i++) {
-                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
+                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<>(
+                                       
ioManager.getSpillingDirectoriesPaths());
                }
 
                watermarks = new long[inputGate.getNumberOfInputChannels()];

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 07ada23..733e7fb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -139,7 +139,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
                
                for (int i = 0; i < recordDeserializers.length; i++) {
-                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
+                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<>(
+                                       
ioManager.getSpillingDirectoriesPaths());
                }
 
                // determine which unioned channels belong to input 1 and which 
belong to input 2

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index c62c881..b2d0196 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
@@ -50,6 +49,7 @@ import 
org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -303,7 +303,7 @@ public class StreamMockEnvironment implements Environment {
 
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return new TaskManagerRuntimeInfo("localhost", new 
UnmodifiableConfiguration(new Configuration()));
+               return new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir"));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index deda82f..f054e18 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -158,7 +158,7 @@ public class StreamTaskTest {
                                new FiniteDuration(60, TimeUnit.SECONDS),
                                libCache,
                                mock(FileCache.class),
-                               new TaskManagerRuntimeInfo("localhost", new 
Configuration()),
+                               new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
                                mock(TaskMetricGroup.class));
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 06df46f..5506f55 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -18,27 +18,28 @@
 
 package org.apache.flink.test.runtime;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
 import org.junit.Ignore;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.test.util.JavaProgramTestBase;
 
 @Ignore
 public class NetworkStackThroughputITCase {
@@ -195,7 +196,11 @@ public class NetworkStackThroughputITCase {
 
                @Override
                public void invoke() throws Exception {
-                       RecordReader<SpeedTestRecord> reader = new 
RecordReader<>(getEnvironment().getInputGate(0), SpeedTestRecord.class);
+                       RecordReader<SpeedTestRecord> reader = new 
RecordReader<>(
+                                       getEnvironment().getInputGate(0),
+                                       SpeedTestRecord.class,
+                                       
getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
                        RecordWriter<SpeedTestRecord> writer = new 
RecordWriter<>(getEnvironment().getWriter(0));
 
                        try {
@@ -215,7 +220,10 @@ public class NetworkStackThroughputITCase {
 
                @Override
                public void invoke() throws Exception {
-                       RecordReader<SpeedTestRecord> reader = new 
RecordReader<>(getEnvironment().getInputGate(0), SpeedTestRecord.class);
+                       RecordReader<SpeedTestRecord> reader = new 
RecordReader<>(
+                                       getEnvironment().getInputGate(0),
+                                       SpeedTestRecord.class,
+                                       
getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
                        try {
                                boolean isSlow = 
getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);

Reply via email to