This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 479b6890bf1e1e8f4634903975b4983c96913dc2 Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Jul 5 16:33:10 2019 +0200 [hotfix][test] Move CloseableRegistry as field in InputBuffersMetricsTest --- .../consumer/InputBuffersMetricsTest.java | 113 +++++++++++---------- 1 file changed, 59 insertions(+), 54 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java index a602648..8d868cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -43,6 +45,18 @@ import static org.junit.Assert.assertEquals; */ public class InputBuffersMetricsTest extends TestLogger { + private CloseableRegistry closeableRegistry; + + @Before + public void setup() { + closeableRegistry = new CloseableRegistry(); + } + + @After + public void tearDown() throws IOException { + closeableRegistry.close(); + } + @Test public void testCalculateTotalBuffersSize() throws IOException { int numberOfRemoteChannels = 2; @@ -55,11 +69,13 @@ public class InputBuffersMetricsTest extends TestLogger { .setNetworkBuffersPerChannel(numberOfBufferPerChannel) .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) .build(); + closeableRegistry.registerCloseable(network::close); SingleInputGate inputGate1 = buildInputGate( network, numberOfRemoteChannels, numberOfLocalChannels).f0; + closeableRegistry.registerCloseable(inputGate1::close); SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); @@ -69,15 +85,12 @@ public class InputBuffersMetricsTest extends TestLogger { exclusiveBuffersUsageGauge, inputGates); - try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); - closeableRegistry.registerCloseable(network::close); - closeableRegistry.registerCloseable(inputGate1::close); - - assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); - assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); - assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); - } + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); } @Test @@ -96,6 +109,7 @@ public class InputBuffersMetricsTest extends TestLogger { .setNetworkBuffersPerChannel(buffersPerChannel) .setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate) .build(); + closeableRegistry.registerCloseable(network::close); Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate( network, @@ -108,6 +122,8 @@ public class InputBuffersMetricsTest extends TestLogger { SingleInputGate inputGate1 = tuple1.f0; SingleInputGate inputGate2 = tuple2.f0; + closeableRegistry.registerCloseable(inputGate1::close); + closeableRegistry.registerCloseable(inputGate2::close); List<RemoteInputChannel> remoteInputChannels = tuple1.f1; @@ -119,29 +135,22 @@ public class InputBuffersMetricsTest extends TestLogger { exclusiveBuffersUsageGauge, inputGates); - try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { - assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0); - assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0); - - int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels; - - int channelIndex = 1; - for (RemoteInputChannel channel : remoteInputChannels) { - drainAndValidate( - buffersPerChannel, - buffersPerChannel * channelIndex++, - channel, - closeableRegistry, - totalBuffers, - buffersPerChannel * totalNumberOfRemoteChannels, - exclusiveBuffersUsageGauge, - inputBuffersUsageGauge, - inputGate1); - } - } finally { - inputGate1.close(); - inputGate2.close(); - network.close(); + assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0); + assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0); + + int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels; + + int channelIndex = 1; + for (RemoteInputChannel channel : remoteInputChannels) { + drainAndValidate( + buffersPerChannel, + buffersPerChannel * channelIndex++, + channel, + totalBuffers, + buffersPerChannel * totalNumberOfRemoteChannels, + exclusiveBuffersUsageGauge, + inputBuffersUsageGauge, + inputGate1); } } @@ -162,6 +171,7 @@ public class InputBuffersMetricsTest extends TestLogger { .setNetworkBuffersPerChannel(buffersPerChannel) .setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate) .build(); + closeableRegistry.registerCloseable(network::close); Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate( network, @@ -173,6 +183,8 @@ public class InputBuffersMetricsTest extends TestLogger { numberOfLocalChannelsGate2).f0; SingleInputGate inputGate1 = tuple1.f0; + closeableRegistry.registerCloseable(inputGate1::close); + closeableRegistry.registerCloseable(inputGate2::close); RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0); @@ -184,54 +196,47 @@ public class InputBuffersMetricsTest extends TestLogger { exclusiveBuffersUsageGauge, inputGates); - try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { - assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0); - assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0); + assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0); + assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0); - // drain gate1's exclusive buffers - drainBuffer(buffersPerChannel, remoteInputChannel1, closeableRegistry); + // drain gate1's exclusive buffers + drainBuffer(buffersPerChannel, remoteInputChannel1); - int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels; + int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels; - remoteInputChannel1.requestSubpartition(0); + remoteInputChannel1.requestSubpartition(0); - int backlog = 3; - int totalRequestedBuffers = buffersPerChannel + backlog; + int backlog = 3; + int totalRequestedBuffers = buffersPerChannel + backlog; - remoteInputChannel1.onSenderBacklog(backlog); + remoteInputChannel1.onSenderBacklog(backlog); - assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable()); + assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable()); - drainBuffer(totalRequestedBuffers, remoteInputChannel1, closeableRegistry); + drainBuffer(totalRequestedBuffers, remoteInputChannel1); - assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable()); - assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers, - inputBuffersUsageGauge.getValue(), 0.0001); - } finally { - inputGate1.close(); - inputGate2.close(); - network.close(); - } + assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable()); + assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers, + inputBuffersUsageGauge.getValue(), 0.0001); } private void drainAndValidate( int numBuffersToRequest, int totalRequestedBuffers, RemoteInputChannel channel, - CloseableRegistry closeableRegistry, int totalBuffers, int totalExclusiveBuffers, ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge, CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge, SingleInputGate inputGate) throws IOException { - drainBuffer(numBuffersToRequest, channel, closeableRegistry); + drainBuffer(numBuffersToRequest, channel); assertEquals(totalRequestedBuffers, exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate)); assertEquals((double) totalRequestedBuffers / totalExclusiveBuffers, exclusiveBuffersUsageGauge.getValue(), 0.0001); assertEquals((double) totalRequestedBuffers / totalBuffers, inputBuffersUsageGauge.getValue(), 0.0001); } - private void drainBuffer(int boundary, RemoteInputChannel channel, CloseableRegistry closeableRegistry) throws IOException { + private void drainBuffer(int boundary, RemoteInputChannel channel) throws IOException { for (int i = 0; i < boundary; i++) { Buffer buffer = channel.requestBuffer(); if (buffer != null) {
