This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dd5ad2c [FLINK-11707][network] InputGate extends AutoCloseable
dd5ad2c is described below
commit dd5ad2cbeff899b9cb51c176719fbdb31ed7570c
Author: zhijiang <[email protected]>
AuthorDate: Wed Mar 27 23:15:40 2019 +0800
[FLINK-11707][network] InputGate extends AutoCloseable
---
.../apache/flink/runtime/io/network/NetworkEnvironment.java | 2 +-
.../flink/runtime/io/network/partition/consumer/InputGate.java | 2 +-
.../runtime/io/network/partition/consumer/SingleInputGate.java | 3 ++-
.../runtime/io/network/partition/consumer/UnionInputGate.java | 4 ++++
.../main/java/org/apache/flink/runtime/taskmanager/Task.java | 9 +++++----
.../flink/runtime/io/network/NetworkEnvironmentTest.java | 4 ++--
.../netty/CreditBasedPartitionRequestClientHandlerTest.java | 8 ++++----
.../io/network/netty/PartitionRequestClientHandlerTest.java | 2 +-
.../runtime/io/network/netty/PartitionRequestClientTest.java | 4 ++--
.../io/network/partition/consumer/LocalInputChannelTest.java | 4 ++--
.../io/network/partition/consumer/SingleInputGateTest.java | 10 +++++-----
.../streaming/runtime/io/BarrierBufferMassiveRandomTest.java | 4 ++++
.../org/apache/flink/streaming/runtime/io/MockInputGate.java | 3 +++
13 files changed, 36 insertions(+), 23 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d7c3ffc..d3cb897 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -303,7 +303,7 @@ public class NetworkEnvironment {
for (SingleInputGate gate : inputGates) {
try {
if (gate != null) {
-
gate.releaseAllResources();
+ gate.close();
}
}
catch (IOException e) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 6e59f91..c270d37 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -65,7 +65,7 @@ import java.util.Optional;
* will have an input gate attached to it. This will provide its input, which
will consist of one
* subpartition from each partition of the intermediate result.
*/
-public interface InputGate {
+public interface InputGate extends AutoCloseable {
int getNumberOfInputChannels();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index f51dc74..133f859 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -421,7 +421,8 @@ public class SingleInputGate implements InputGate {
}
}
- public void releaseAllResources() throws IOException {
+ @Override
+ public void close() throws IOException {
boolean released = false;
synchronized (requestLock) {
if (!isReleased) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index d3085cb..ea83004 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -265,6 +265,10 @@ public class UnionInputGate implements InputGate,
InputGateListener {
}
@Override
+ public void close() throws IOException {
+ }
+
+ @Override
public void notifyInputGateNonEmpty(InputGate inputGate) {
queueInputGate(checkNotNull(inputGate));
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ce139f0..698f8bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -54,6 +54,7 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartition;
import
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionMetrics;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateMetrics;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -1442,7 +1443,7 @@ public class Task implements Runnable, TaskActions,
CheckpointListener {
private final Thread executer;
private final String taskName;
private final ResultPartition[] producedPartitions;
- private final SingleInputGate[] inputGates;
+ private final InputGate[] inputGates;
public TaskCanceler(
Logger logger,
@@ -1450,7 +1451,7 @@ public class Task implements Runnable, TaskActions,
CheckpointListener {
Thread executer,
String taskName,
ResultPartition[] producedPartitions,
- SingleInputGate[] inputGates) {
+ InputGate[] inputGates) {
this.logger = logger;
this.invokable = invokable;
@@ -1488,9 +1489,9 @@ public class Task implements Runnable, TaskActions,
CheckpointListener {
}
}
- for (SingleInputGate inputGate : inputGates) {
+ for (InputGate inputGate : inputGates) {
try {
- inputGate.releaseAllResources();
+ inputGate.close();
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
LOG.error("Failed to release
input gate for task {}.", taskName, t);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 8c2fb7a..89742ba 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -134,7 +134,7 @@ public class NetworkEnvironmentTest {
rp.release();
}
for (SingleInputGate ig : inputGates) {
- ig.releaseAllResources();
+ ig.close();
}
network.shutdown();
}
@@ -258,7 +258,7 @@ public class NetworkEnvironmentTest {
rp.release();
}
for (SingleInputGate ig : inputGates) {
- ig.releaseAllResources();
+ ig.close();
}
network.shutdown();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index ea3646d..78c2a67 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -157,7 +157,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
assertEquals(2, inputChannel.getSenderBacklog());
} finally {
// Release all the buffer resources
- inputGate.releaseAllResources();
+ inputGate.close();
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
@@ -328,7 +328,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
assertNull(channel.readOutbound());
} finally {
// Release all the buffer resources
- inputGate.releaseAllResources();
+ inputGate.close();
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
@@ -370,7 +370,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
assertEquals(2, inputChannel.getUnannouncedCredit());
// Release the input channel
- inputGate.releaseAllResources();
+ inputGate.close();
// it should send a close request after releasing the
input channel,
// but will not notify credits for a released input
channel.
@@ -382,7 +382,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
assertNull(channel.readOutbound());
} finally {
// Release all the buffer resources
- inputGate.releaseAllResources();
+ inputGate.close();
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 842aed8..ff604df 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -150,7 +150,7 @@ public class PartitionRequestClientHandlerTest {
assertEquals(1,
inputChannel.getNumberOfQueuedBuffers());
} finally {
// Release all the buffer resources
- inputGate.releaseAllResources();
+ inputGate.close();
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
index a11b0ba..42206a6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
@@ -92,7 +92,7 @@ public class PartitionRequestClientTest {
assertNull(channel.readOutbound());
} finally {
// Release all the buffer resources
- inputGate.releaseAllResources();
+ inputGate.close();
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
@@ -127,7 +127,7 @@ public class PartitionRequestClientTest {
assertNull(channel.readOutbound());
} finally {
// Release all the buffer resources
- inputGate.releaseAllResources();
+ inputGate.close();
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 0d82bff..3865c67 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -339,7 +339,7 @@ public class LocalInputChannelTest {
@Override
public void run() {
try {
- gate.releaseAllResources();
+ gate.close();
} catch (IOException ignored) {
}
}
@@ -557,7 +557,7 @@ public class LocalInputChannelTest {
}
}
finally {
- inputGate.releaseAllResources();
+ inputGate.close();
}
return null;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 63f1855..693960a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -298,7 +298,7 @@ public class SingleInputGateTest {
assertTrue("Did not trigger blocking buffer request.", success);
// Release the input gate
- inputGate.releaseAllResources();
+ inputGate.close();
// Wait for Thread to finish and verify expected Exceptions. If
the
// input gate status is not properly checked during requests,
this
@@ -388,7 +388,7 @@ public class SingleInputGateTest {
assertFalse(ch.increaseBackoff());
}
} finally {
- gate.releaseAllResources();
+ gate.close();
netEnv.shutdown();
}
}
@@ -426,7 +426,7 @@ public class SingleInputGateTest {
assertEquals(buffersPerChannel +
extraNetworkBuffersPerGate, bufferPool.countBuffers());
}
} finally {
- inputGate.releaseAllResources();
+ inputGate.close();
network.shutdown();
}
}
@@ -479,7 +479,7 @@ public class SingleInputGateTest {
assertEquals(buffersPerChannel +
extraNetworkBuffersPerGate, bufferPool.countBuffers());
}
} finally {
- inputGate.releaseAllResources();
+ inputGate.close();
network.shutdown();
}
}
@@ -530,7 +530,7 @@ public class SingleInputGateTest {
assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()),
is(instanceOf((LocalInputChannel.class))));
} finally {
- inputGate.releaseAllResources();
+ inputGate.close();
network.shutdown();
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index afbb036..ded52ff 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -205,5 +205,9 @@ public class BarrierBufferMassiveRandomTest {
public int getPageSize() {
return PAGE_SIZE;
}
+
+ @Override
+ public void close() {
+ }
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index a150b8f..a29cbf5 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -114,4 +114,7 @@ public class MockInputGate implements InputGate {
public void registerListener(InputGateListener listener) {
}
+ @Override
+ public void close() {
+ }
}