This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 479b6890bf1e1e8f4634903975b4983c96913dc2
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Jul 5 16:33:10 2019 +0200

    [hotfix][test] Move CloseableRegistry as field in InputBuffersMetricsTest
---
 .../consumer/InputBuffersMetricsTest.java          | 113 +++++++++++----------
 1 file changed, 59 insertions(+), 54 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
index a602648..8d868cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -43,6 +45,18 @@ import static org.junit.Assert.assertEquals;
  */
 public class InputBuffersMetricsTest extends TestLogger {
 
+       private CloseableRegistry closeableRegistry;
+
+       @Before
+       public void setup() {
+               closeableRegistry = new CloseableRegistry();
+       }
+
+       @After
+       public void tearDown() throws IOException {
+               closeableRegistry.close();
+       }
+
        @Test
        public void testCalculateTotalBuffersSize() throws IOException {
                int numberOfRemoteChannels = 2;
@@ -55,11 +69,13 @@ public class InputBuffersMetricsTest extends TestLogger {
                        .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
                        
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
                        .build();
+               closeableRegistry.registerCloseable(network::close);
 
                SingleInputGate inputGate1 = buildInputGate(
                        network,
                        numberOfRemoteChannels,
                        numberOfLocalChannels).f0;
+               closeableRegistry.registerCloseable(inputGate1::close);
 
                SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
                FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
@@ -69,15 +85,12 @@ public class InputBuffersMetricsTest extends TestLogger {
                        exclusiveBuffersUsageGauge,
                        inputGates);
 
-               try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+               closeableRegistry.registerCloseable(network::close);
+               closeableRegistry.registerCloseable(inputGate1::close);
 
-                       closeableRegistry.registerCloseable(network::close);
-                       closeableRegistry.registerCloseable(inputGate1::close);
-
-                       assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
-                       assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
-                       assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
-               }
+               assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+               assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+               assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel 
+ numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
        }
 
        @Test
@@ -96,6 +109,7 @@ public class InputBuffersMetricsTest extends TestLogger {
                        .setNetworkBuffersPerChannel(buffersPerChannel)
                        
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
                        .build();
+               closeableRegistry.registerCloseable(network::close);
 
                Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = 
buildInputGate(
                        network,
@@ -108,6 +122,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 
                SingleInputGate inputGate1 = tuple1.f0;
                SingleInputGate inputGate2 = tuple2.f0;
+               closeableRegistry.registerCloseable(inputGate1::close);
+               closeableRegistry.registerCloseable(inputGate2::close);
 
                List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
 
@@ -119,29 +135,22 @@ public class InputBuffersMetricsTest extends TestLogger {
                        exclusiveBuffersUsageGauge,
                        inputGates);
 
-               try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
-                       assertEquals(0.0, 
exclusiveBuffersUsageGauge.getValue(), 0.0);
-                       assertEquals(0.0, inputBuffersUsageGauge.getValue(), 
0.0);
-
-                       int totalBuffers = extraNetworkBuffersPerGate * 
inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
-
-                       int channelIndex = 1;
-                       for (RemoteInputChannel channel : remoteInputChannels) {
-                               drainAndValidate(
-                                       buffersPerChannel,
-                                       buffersPerChannel * channelIndex++,
-                                       channel,
-                                       closeableRegistry,
-                                       totalBuffers,
-                                       buffersPerChannel * 
totalNumberOfRemoteChannels,
-                                       exclusiveBuffersUsageGauge,
-                                       inputBuffersUsageGauge,
-                                       inputGate1);
-                       }
-               } finally {
-                       inputGate1.close();
-                       inputGate2.close();
-                       network.close();
+               assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
+               assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+
+               int totalBuffers = extraNetworkBuffersPerGate * 
inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+
+               int channelIndex = 1;
+               for (RemoteInputChannel channel : remoteInputChannels) {
+                       drainAndValidate(
+                               buffersPerChannel,
+                               buffersPerChannel * channelIndex++,
+                               channel,
+                               totalBuffers,
+                               buffersPerChannel * totalNumberOfRemoteChannels,
+                               exclusiveBuffersUsageGauge,
+                               inputBuffersUsageGauge,
+                               inputGate1);
                }
        }
 
@@ -162,6 +171,7 @@ public class InputBuffersMetricsTest extends TestLogger {
                        .setNetworkBuffersPerChannel(buffersPerChannel)
                        
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
                        .build();
+               closeableRegistry.registerCloseable(network::close);
 
                Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = 
buildInputGate(
                        network,
@@ -173,6 +183,8 @@ public class InputBuffersMetricsTest extends TestLogger {
                        numberOfLocalChannelsGate2).f0;
 
                SingleInputGate inputGate1 = tuple1.f0;
+               closeableRegistry.registerCloseable(inputGate1::close);
+               closeableRegistry.registerCloseable(inputGate2::close);
 
                RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
 
@@ -184,54 +196,47 @@ public class InputBuffersMetricsTest extends TestLogger {
                        exclusiveBuffersUsageGauge,
                        inputGates);
 
-               try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
-                       assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 
0.0);
-                       assertEquals(0.0, inputBuffersUsageGauge.getValue(), 
0.0);
+               assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
+               assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
 
-                       // drain gate1's exclusive buffers
-                       drainBuffer(buffersPerChannel, remoteInputChannel1, 
closeableRegistry);
+               // drain gate1's exclusive buffers
+               drainBuffer(buffersPerChannel, remoteInputChannel1);
 
-                       int totalBuffers = extraNetworkBuffersPerGate * 
inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+               int totalBuffers = extraNetworkBuffersPerGate * 
inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
 
-                       remoteInputChannel1.requestSubpartition(0);
+               remoteInputChannel1.requestSubpartition(0);
 
-                       int backlog = 3;
-                       int totalRequestedBuffers = buffersPerChannel + backlog;
+               int backlog = 3;
+               int totalRequestedBuffers = buffersPerChannel + backlog;
 
-                       remoteInputChannel1.onSenderBacklog(backlog);
+               remoteInputChannel1.onSenderBacklog(backlog);
 
-                       assertEquals(totalRequestedBuffers, 
remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+               assertEquals(totalRequestedBuffers, 
remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
 
-                       drainBuffer(totalRequestedBuffers, remoteInputChannel1, 
closeableRegistry);
+               drainBuffer(totalRequestedBuffers, remoteInputChannel1);
 
-                       assertEquals(0, 
remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
-                       assertEquals((double) (buffersPerChannel + 
totalRequestedBuffers) / totalBuffers,
-                               inputBuffersUsageGauge.getValue(), 0.0001);
-               } finally {
-                       inputGate1.close();
-                       inputGate2.close();
-                       network.close();
-               }
+               assertEquals(0, 
remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+               assertEquals((double) (buffersPerChannel + 
totalRequestedBuffers) / totalBuffers,
+                       inputBuffersUsageGauge.getValue(), 0.0001);
        }
 
        private void drainAndValidate(
                int numBuffersToRequest,
                int totalRequestedBuffers,
                RemoteInputChannel channel,
-               CloseableRegistry closeableRegistry,
                int totalBuffers,
                int totalExclusiveBuffers,
                ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
                CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge,
                SingleInputGate inputGate) throws IOException {
 
-               drainBuffer(numBuffersToRequest, channel, closeableRegistry);
+               drainBuffer(numBuffersToRequest, channel);
                assertEquals(totalRequestedBuffers, 
exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
                assertEquals((double) totalRequestedBuffers / 
totalExclusiveBuffers, exclusiveBuffersUsageGauge.getValue(), 0.0001);
                assertEquals((double) totalRequestedBuffers / totalBuffers, 
inputBuffersUsageGauge.getValue(), 0.0001);
        }
 
-       private void drainBuffer(int boundary, RemoteInputChannel channel, 
CloseableRegistry closeableRegistry) throws IOException {
+       private void drainBuffer(int boundary, RemoteInputChannel channel) 
throws IOException {
                for (int i = 0; i < boundary; i++) {
                        Buffer buffer = channel.requestBuffer();
                        if (buffer != null) {

Reply via email to