This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d23704d  [FLINK-11676][network] Make ResultPartitionWriter extend 
AutoCloseable
d23704d is described below

commit d23704d9ca58f4866840f63ced1aef21dff10436
Author: Zhijiang <[email protected]>
AuthorDate: Wed Feb 20 18:26:04 2019 +0800

    [FLINK-11676][network] Make ResultPartitionWriter extend AutoCloseable
---
 .../org/apache/flink/runtime/io/network/NetworkEnvironment.java   | 2 +-
 .../runtime/io/network/api/writer/ResultPartitionWriter.java      | 2 +-
 .../flink/runtime/io/network/partition/ResultPartition.java       | 3 ++-
 .../src/main/java/org/apache/flink/runtime/taskmanager/Task.java  | 2 +-
 .../api/writer/AbstractCollectingResultPartitionWriter.java       | 4 ++++
 .../flink/runtime/io/network/api/writer/RecordWriterTest.java     | 8 ++++++++
 6 files changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index b18f844..d7c3ffc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -294,7 +294,7 @@ public class NetworkEnvironment {
 
                        for (ResultPartition partition : 
task.getProducedPartitions()) {
                                
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
-                               partition.destroyBufferPool();
+                               partition.close();
                        }
 
                        final SingleInputGate[] inputGates = 
task.getAllInputGates();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 4b5986e..661c319 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 /**
  * A buffer-oriented runtime result writer API for producing results.
  */
-public interface ResultPartitionWriter {
+public interface ResultPartitionWriter extends AutoCloseable {
 
        BufferProvider getBufferProvider();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 85fc560..a876bea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -324,7 +324,8 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                }
        }
 
-       public void destroyBufferPool() {
+       @Override
+       public void close() {
                if (bufferPool != null) {
                        bufferPool.lazyDestroy();
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d5168cf..ce139f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1481,7 +1481,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                                // will get misleading errors in the logs.
                                for (ResultPartition partition : 
producedPartitions) {
                                        try {
-                                               partition.destroyBufferPool();
+                                               partition.close();
                                        } catch (Throwable t) {
                                                
ExceptionUtils.rethrowIfFatalError(t);
                                                LOG.error("Failed to release 
result partition buffer pool for task {}.", taskName, t);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 981ca56..5de0d70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -101,5 +101,9 @@ public abstract class 
AbstractCollectingResultPartitionWriter implements ResultP
                flushAll();
        }
 
+       @Override
+       public void close() {
+       }
+
        protected abstract void deserializeBuffer(Buffer buffer) throws 
IOException;
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 552f292..c6d0e02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -502,6 +502,10 @@ public class RecordWriterTest {
                @Override
                public void flush(int subpartitionIndex) {
                }
+
+               @Override
+               public void close() {
+               }
        }
 
        private static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, 
int targetChannel) throws IOException {
@@ -559,6 +563,10 @@ public class RecordWriterTest {
                @Override
                public void flush(int subpartitionIndex) {
                }
+
+               @Override
+               public void close() {
+               }
        }
 
        private static class ByteArrayIO implements IOReadableWritable {

Reply via email to