This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 0c49431 GEODE-5270: Closing the connection. (#2008) 0c49431 is described below commit 0c4943189145bccdcf1f0e3e8eff7e7797b0d0bc Author: Nabarun Nag <nabarun...@users.noreply.github.com> AuthorDate: Fri Jun 1 11:11:11 2018 -0700 GEODE-5270: Closing the connection. (#2008) * Shutting down the ackThreadReader on stop processing. * closing the input stream on closing the connection by stomper thread. Co-authored-by: bijukunjummen <biju.kunjum...@gmail.com> --- .../client/internal/pooling/PooledConnection.java | 1 + .../pooling/PooledConnectionJUnitTest.java | 46 ++++++++++++++++++++++ .../wan/GatewaySenderEventRemoteDispatcher.java | 3 +- ...atewaySenderEventRemoteDispatcherJUnitTest.java | 14 +++++++ 4 files changed, 63 insertions(+), 1 deletion(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java index 7698b10..bcff681 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java @@ -92,6 +92,7 @@ class PooledConnection implements Connection { public void internalClose(boolean keepAlive) throws Exception { try { Connection con = this.connection; + con.getInputStream().close(); if (con != null) { con.close(keepAlive); } diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/PooledConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/PooledConnectionJUnitTest.java new file mode 100644 index 0000000..14289d0 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/PooledConnectionJUnitTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.client.internal.pooling; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.InputStream; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.client.internal.Connection; +import org.apache.geode.test.junit.categories.ClientServerTest; +import org.apache.geode.test.junit.categories.IntegrationTest; + +@Category({IntegrationTest.class, ClientServerTest.class}) +public class PooledConnectionJUnitTest { + + @Test + public void internalCloseMustCloseTheInputStream() throws Exception { + Connection connection = mock(Connection.class); + ConnectionManagerImpl connectionManager = mock(ConnectionManagerImpl.class); + InputStream inputStream = mock(InputStream.class); + when(connection.getInputStream()).thenReturn(inputStream); + PooledConnection pooledConnection = new PooledConnection(connectionManager, connection); + doNothing().when(connection).close(false); + pooledConnection.internalClose(false); + verify(inputStream, times(1)).close(); + } +} diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 8ae7fb0..d6bbc9d 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -790,7 +790,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis } } - private void shutDownAckReaderConnection(Connection connection) { + protected void shutDownAckReaderConnection(Connection connection) { Connection conn = connection; // attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck // on a read @@ -825,6 +825,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis public void shutDownAckReaderConnection() { if (ackReaderThread != null) { ackReaderThread.shutDownAckReaderConnection(connection); + ackReaderThread.shutdown(); } } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java index cb34e88..2d5b039 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java @@ -43,4 +43,18 @@ public class GatewaySenderEventRemoteDispatcherJUnitTest { assertNull(dispatcher.getConnection(false)); assertTrue(ackReaderThread.isShutdown()); } + + @Test + public void shuttingDownAckThreadReaderConnectionShouldshutdownTheAckThreadReader() { + AbstractGatewaySender sender = mock(AbstractGatewaySender.class); + AbstractGatewaySenderEventProcessor eventProcessor = + mock(AbstractGatewaySenderEventProcessor.class); + GatewaySenderEventRemoteDispatcher dispatcher = + new GatewaySenderEventRemoteDispatcher(eventProcessor, null); + GatewaySenderEventRemoteDispatcher.AckReaderThread ackReaderThread = + dispatcher.new AckReaderThread(sender, "AckReaderThread"); + dispatcher.setAckReaderThread(ackReaderThread); + dispatcher.shutDownAckReaderConnection(); + assertTrue(ackReaderThread.isShutdown()); + } } -- To stop receiving notification emails like this one, please contact n...@apache.org.