Repository: avro Updated Branches: refs/heads/branch-1.7 011a85653 -> 40733f9ab
AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver. Contributed by Gareth Davis. git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1641894 13f79535-47bb-0310-9956-ffa450edef68 (cherry picked from commit fbaf3c399e2f34e57008d8625c76f0543a3cadf4) Project: http://git-wip-us.apache.org/repos/asf/avro/repo Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/40733f9a Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/40733f9a Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/40733f9a Branch: refs/heads/branch-1.7 Commit: 40733f9ab971c7f3fa26bb6c1a2f6dbf88b2a612 Parents: 011a856 Author: Doug Cutting <[email protected]> Authored: Wed Nov 26 19:30:18 2014 +0000 Committer: Nandor Kollar <[email protected]> Committed: Wed Nov 15 11:53:46 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../org/apache/avro/ipc/NettyTransceiver.java | 12 +++ .../ipc/NettyTransceiverWhenFailsToConnect.java | 82 ++++++++++++++++++++ 3 files changed, 97 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9f24bbd..5402b0e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -75,6 +75,9 @@ Trunk (not yet released) AVRO-1883: Java: Fix incompatible schema detection nested in unions. (Yibing Shi via blue) + AVRO-1407: Java: Fix infinite loop on slow connect in NettyTransceiver. + (Gareth Davis via cutting) + Avro 1.7.7 (23 July 2014) NEW FEATURES http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java index f53a5bc..a8a2e3d 100644 --- a/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java +++ b/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java @@ -204,6 +204,18 @@ public class NettyTransceiver extends Transceiver { stateLock.readLock().lock(); try { getChannel(); + } catch (Throwable e) { + // must attempt to clean up any allocated channel future + if (channelFuture != null) { + channelFuture.getChannel().close(); + } + + if (e instanceof IOException) + throw (IOException)e; + if (e instanceof RuntimeException) + throw (RuntimeException)e; + // all that's left is Error + throw (Error)e; } finally { stateLock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/avro/blob/40733f9a/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java ---------------------------------------------------------------------- diff --git a/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java b/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java new file mode 100644 index 0000000..6eefc2d --- /dev/null +++ b/lang/java/ipc/src/test/java/org/apache/avro/ipc/NettyTransceiverWhenFailsToConnect.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.ipc; + +import junit.framework.Assert; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.socket.SocketChannel; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; + +/** + * This is a very specific test that verifies that if the NettyTransceiver fails + * to connect it cleans up the netty channel that it has created. + */ +public class NettyTransceiverWhenFailsToConnect { + + @Test(expected = IOException.class) + public void testNettyTransceiverReleasesNettyChannelOnFailingToConnect() throws Exception { + ServerSocket serverSocket = null; + LastChannelRememberingChannelFactory socketChannelFactory = null; + + try { + serverSocket = new ServerSocket(0); + socketChannelFactory = new LastChannelRememberingChannelFactory(); + + try { + new NettyTransceiver( + new InetSocketAddress(serverSocket.getLocalPort()), + socketChannelFactory, + 1L + ); + } finally { + assertEquals("expected that the channel opened by the transceiver is closed", + false, socketChannelFactory.lastChannel.isOpen()); + } + } finally { + + if (serverSocket != null) { + // closing the server socket will actually free up the open channel in the + // transceiver, which would have hung otherwise (pre AVRO-1407) + serverSocket.close(); + } + + if (socketChannelFactory != null) { + socketChannelFactory.releaseExternalResources(); + } + } + } + + class LastChannelRememberingChannelFactory extends NioClientSocketChannelFactory implements ChannelFactory { + + volatile SocketChannel lastChannel; + + @Override + public SocketChannel newChannel(ChannelPipeline pipeline) { + return lastChannel= super.newChannel(pipeline); + } + } +}
