This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd5ad2c  [FLINK-11707][network] InputGate extends AutoCloseable
dd5ad2c is described below

commit dd5ad2cbeff899b9cb51c176719fbdb31ed7570c
Author: zhijiang <[email protected]>
AuthorDate: Wed Mar 27 23:15:40 2019 +0800

    [FLINK-11707][network] InputGate extends AutoCloseable
---
 .../apache/flink/runtime/io/network/NetworkEnvironment.java    |  2 +-
 .../flink/runtime/io/network/partition/consumer/InputGate.java |  2 +-
 .../runtime/io/network/partition/consumer/SingleInputGate.java |  3 ++-
 .../runtime/io/network/partition/consumer/UnionInputGate.java  |  4 ++++
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java   |  9 +++++----
 .../flink/runtime/io/network/NetworkEnvironmentTest.java       |  4 ++--
 .../netty/CreditBasedPartitionRequestClientHandlerTest.java    |  8 ++++----
 .../io/network/netty/PartitionRequestClientHandlerTest.java    |  2 +-
 .../runtime/io/network/netty/PartitionRequestClientTest.java   |  4 ++--
 .../io/network/partition/consumer/LocalInputChannelTest.java   |  4 ++--
 .../io/network/partition/consumer/SingleInputGateTest.java     | 10 +++++-----
 .../streaming/runtime/io/BarrierBufferMassiveRandomTest.java   |  4 ++++
 .../org/apache/flink/streaming/runtime/io/MockInputGate.java   |  3 +++
 13 files changed, 36 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d7c3ffc..d3cb897 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -303,7 +303,7 @@ public class NetworkEnvironment {
                                for (SingleInputGate gate : inputGates) {
                                        try {
                                                if (gate != null) {
-                                                       
gate.releaseAllResources();
+                                                       gate.close();
                                                }
                                        }
                                        catch (IOException e) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 6e59f91..c270d37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -65,7 +65,7 @@ import java.util.Optional;
  * will have an input gate attached to it. This will provide its input, which 
will consist of one
  * subpartition from each partition of the intermediate result.
  */
-public interface InputGate {
+public interface InputGate extends AutoCloseable {
 
        int getNumberOfInputChannels();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index f51dc74..133f859 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -421,7 +421,8 @@ public class SingleInputGate implements InputGate {
                }
        }
 
-       public void releaseAllResources() throws IOException {
+       @Override
+       public void close() throws IOException {
                boolean released = false;
                synchronized (requestLock) {
                        if (!isReleased) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index d3085cb..ea83004 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -265,6 +265,10 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
        }
 
        @Override
+       public void close() throws IOException {
+       }
+
+       @Override
        public void notifyInputGateNonEmpty(InputGate inputGate) {
                queueInputGate(checkNotNull(inputGate));
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ce139f0..698f8bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -54,6 +54,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionMetrics;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGateMetrics;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -1442,7 +1443,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                private final Thread executer;
                private final String taskName;
                private final ResultPartition[] producedPartitions;
-               private final SingleInputGate[] inputGates;
+               private final InputGate[] inputGates;
 
                public TaskCanceler(
                                Logger logger,
@@ -1450,7 +1451,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                                Thread executer,
                                String taskName,
                                ResultPartition[] producedPartitions,
-                               SingleInputGate[] inputGates) {
+                               InputGate[] inputGates) {
 
                        this.logger = logger;
                        this.invokable = invokable;
@@ -1488,9 +1489,9 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                                        }
                                }
 
-                               for (SingleInputGate inputGate : inputGates) {
+                               for (InputGate inputGate : inputGates) {
                                        try {
-                                               inputGate.releaseAllResources();
+                                               inputGate.close();
                                        } catch (Throwable t) {
                                                
ExceptionUtils.rethrowIfFatalError(t);
                                                LOG.error("Failed to release 
input gate for task {}.", taskName, t);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 8c2fb7a..89742ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -134,7 +134,7 @@ public class NetworkEnvironmentTest {
                        rp.release();
                }
                for (SingleInputGate ig : inputGates) {
-                       ig.releaseAllResources();
+                       ig.close();
                }
                network.shutdown();
        }
@@ -258,7 +258,7 @@ public class NetworkEnvironmentTest {
                        rp.release();
                }
                for (SingleInputGate ig : inputGates) {
-                       ig.releaseAllResources();
+                       ig.close();
                }
                network.shutdown();
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index ea3646d..78c2a67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -157,7 +157,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
                        assertEquals(2, inputChannel.getSenderBacklog());
                } finally {
                        // Release all the buffer resources
-                       inputGate.releaseAllResources();
+                       inputGate.close();
 
                        networkBufferPool.destroyAllBufferPools();
                        networkBufferPool.destroy();
@@ -328,7 +328,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
                        assertNull(channel.readOutbound());
                } finally {
                        // Release all the buffer resources
-                       inputGate.releaseAllResources();
+                       inputGate.close();
 
                        networkBufferPool.destroyAllBufferPools();
                        networkBufferPool.destroy();
@@ -370,7 +370,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
                        assertEquals(2, inputChannel.getUnannouncedCredit());
 
                        // Release the input channel
-                       inputGate.releaseAllResources();
+                       inputGate.close();
 
                        // it should send a close request after releasing the 
input channel,
                        // but will not notify credits for a released input 
channel.
@@ -382,7 +382,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
                        assertNull(channel.readOutbound());
                } finally {
                        // Release all the buffer resources
-                       inputGate.releaseAllResources();
+                       inputGate.close();
 
                        networkBufferPool.destroyAllBufferPools();
                        networkBufferPool.destroy();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 842aed8..ff604df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -150,7 +150,7 @@ public class PartitionRequestClientHandlerTest {
                        assertEquals(1, 
inputChannel.getNumberOfQueuedBuffers());
                } finally {
                        // Release all the buffer resources
-                       inputGate.releaseAllResources();
+                       inputGate.close();
 
                        networkBufferPool.destroyAllBufferPools();
                        networkBufferPool.destroy();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
index a11b0ba..42206a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
@@ -92,7 +92,7 @@ public class PartitionRequestClientTest {
                        assertNull(channel.readOutbound());
                } finally {
                        // Release all the buffer resources
-                       inputGate.releaseAllResources();
+                       inputGate.close();
 
                        networkBufferPool.destroyAllBufferPools();
                        networkBufferPool.destroy();
@@ -127,7 +127,7 @@ public class PartitionRequestClientTest {
                        assertNull(channel.readOutbound());
                } finally {
                        // Release all the buffer resources
-                       inputGate.releaseAllResources();
+                       inputGate.close();
 
                        networkBufferPool.destroyAllBufferPools();
                        networkBufferPool.destroy();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 0d82bff..3865c67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -339,7 +339,7 @@ public class LocalInputChannelTest {
                        @Override
                        public void run() {
                                try {
-                                       gate.releaseAllResources();
+                                       gate.close();
                                } catch (IOException ignored) {
                                }
                        }
@@ -557,7 +557,7 @@ public class LocalInputChannelTest {
                                }
                        }
                        finally {
-                               inputGate.releaseAllResources();
+                               inputGate.close();
                        }
 
                        return null;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 63f1855..693960a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -298,7 +298,7 @@ public class SingleInputGateTest {
                assertTrue("Did not trigger blocking buffer request.", success);
 
                // Release the input gate
-               inputGate.releaseAllResources();
+               inputGate.close();
 
                // Wait for Thread to finish and verify expected Exceptions. If 
the
                // input gate status is not properly checked during requests, 
this
@@ -388,7 +388,7 @@ public class SingleInputGateTest {
                                assertFalse(ch.increaseBackoff());
                        }
                } finally {
-                       gate.releaseAllResources();
+                       gate.close();
                        netEnv.shutdown();
                }
        }
@@ -426,7 +426,7 @@ public class SingleInputGateTest {
                                assertEquals(buffersPerChannel + 
extraNetworkBuffersPerGate, bufferPool.countBuffers());
                        }
                } finally {
-                       inputGate.releaseAllResources();
+                       inputGate.close();
                        network.shutdown();
                }
        }
@@ -479,7 +479,7 @@ public class SingleInputGateTest {
                                assertEquals(buffersPerChannel + 
extraNetworkBuffersPerGate, bufferPool.countBuffers());
                        }
                } finally {
-                       inputGate.releaseAllResources();
+                       inputGate.close();
                        network.shutdown();
                }
        }
@@ -530,7 +530,7 @@ public class SingleInputGateTest {
                        
assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()),
                                is(instanceOf((LocalInputChannel.class))));
                } finally {
-                       inputGate.releaseAllResources();
+                       inputGate.close();
                        network.shutdown();
                }
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index afbb036..ded52ff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -205,5 +205,9 @@ public class BarrierBufferMassiveRandomTest {
                public int getPageSize() {
                        return PAGE_SIZE;
                }
+
+               @Override
+               public void close() {
+               }
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index a150b8f..a29cbf5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -114,4 +114,7 @@ public class MockInputGate implements InputGate {
        public void registerListener(InputGateListener listener) {
        }
 
+       @Override
+       public void close() {
+       }
 }

Reply via email to