This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d23704d [FLINK-11676][network] Make ResultPartitionWriter extend
AutoCloseable
d23704d is described below
commit d23704d9ca58f4866840f63ced1aef21dff10436
Author: Zhijiang <[email protected]>
AuthorDate: Wed Feb 20 18:26:04 2019 +0800
[FLINK-11676][network] Make ResultPartitionWriter extend AutoCloseable
---
.../org/apache/flink/runtime/io/network/NetworkEnvironment.java | 2 +-
.../runtime/io/network/api/writer/ResultPartitionWriter.java | 2 +-
.../flink/runtime/io/network/partition/ResultPartition.java | 3 ++-
.../src/main/java/org/apache/flink/runtime/taskmanager/Task.java | 2 +-
.../api/writer/AbstractCollectingResultPartitionWriter.java | 4 ++++
.../flink/runtime/io/network/api/writer/RecordWriterTest.java | 8 ++++++++
6 files changed, 17 insertions(+), 4 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index b18f844..d7c3ffc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -294,7 +294,7 @@ public class NetworkEnvironment {
for (ResultPartition partition :
task.getProducedPartitions()) {
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
- partition.destroyBufferPool();
+ partition.close();
}
final SingleInputGate[] inputGates =
task.getAllInputGates();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 4b5986e..661c319 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -27,7 +27,7 @@ import java.io.IOException;
/**
* A buffer-oriented runtime result writer API for producing results.
*/
-public interface ResultPartitionWriter {
+public interface ResultPartitionWriter extends AutoCloseable {
BufferProvider getBufferProvider();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 85fc560..a876bea 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -324,7 +324,8 @@ public class ResultPartition implements
ResultPartitionWriter, BufferPoolOwner {
}
}
- public void destroyBufferPool() {
+ @Override
+ public void close() {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d5168cf..ce139f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1481,7 +1481,7 @@ public class Task implements Runnable, TaskActions,
CheckpointListener {
// will get misleading errors in the logs.
for (ResultPartition partition :
producedPartitions) {
try {
- partition.destroyBufferPool();
+ partition.close();
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
LOG.error("Failed to release
result partition buffer pool for task {}.", taskName, t);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 981ca56..5de0d70 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -101,5 +101,9 @@ public abstract class
AbstractCollectingResultPartitionWriter implements ResultP
flushAll();
}
+ @Override
+ public void close() {
+ }
+
protected abstract void deserializeBuffer(Buffer buffer) throws
IOException;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 552f292..c6d0e02 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -502,6 +502,10 @@ public class RecordWriterTest {
@Override
public void flush(int subpartitionIndex) {
}
+
+ @Override
+ public void close() {
+ }
}
private static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer,
int targetChannel) throws IOException {
@@ -559,6 +563,10 @@ public class RecordWriterTest {
@Override
public void flush(int subpartitionIndex) {
}
+
+ @Override
+ public void close() {
+ }
}
private static class ByteArrayIO implements IOReadableWritable {