This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push: new bc0c12d5fd1 HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails with broken pipe instead of bad auth (#5740) bc0c12d5fd1 is described below commit bc0c12d5fd19ca5cba84d2900b24505f1ff91dfc Author: Duo Zhang <zhang...@apache.org> AuthorDate: Wed Mar 6 16:08:36 2024 +0800 HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails with broken pipe instead of bad auth (#5740) Also change the IPC related tests to test different combinations of rpc server&client, for example, NettyRpcClient and SimpleRpcServer Signed-off-by: Nick Dimiduk <ndimi...@apache.org> Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org> (cherry picked from commit 2306820df8b41d9af5227465ee2cf9e18b8f0b5c) --- .../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 124 ++++++++++++++++++--- .../apache/hadoop/hbase/ipc/TestBlockingIPC.java | 55 ++------- .../org/apache/hadoop/hbase/ipc/TestNettyIPC.java | 76 +++---------- 3 files changed, 132 insertions(+), 123 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 1f26cc8a0b2..eb158d59fc8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -33,7 +33,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -53,6 +52,7 @@ import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -68,18 +68,22 @@ import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; import org.hamcrest.Matcher; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runners.Parameterized.Parameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -101,20 +105,26 @@ public abstract class AbstractTestIPC { private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); protected static final Configuration CONF = HBaseConfiguration.create(); - static { - // Set the default to be the old SimpleRpcServer. Subclasses test it and netty. - CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); - } - protected abstract RpcServer createRpcServer(final Server server, final String name, - final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException; + private RpcServer createRpcServer(Server server, String name, + List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf, + RpcScheduler scheduler) throws IOException { + return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); + } protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); @Rule public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + @Parameter(0) + public Class<? extends RpcServer> rpcServerImpl; + + @Before + public void setUpBeforeTest() { + CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class); + } + /** * Ensure we do not HAVE TO HAVE a codec. */ @@ -326,15 +336,81 @@ public abstract class AbstractTestIPC { } } - protected abstract RpcServer createTestFailingRpcServer(final Server server, final String name, + private static class FailingSimpleRpcServer extends SimpleRpcServer { + + FailingSimpleRpcServer(Server server, String name, + List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + super(server, name, services, bindAddress, conf, scheduler, true); + } + + final class FailingConnection extends SimpleServerRpcConnection { + private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel, + long lastContact) { + super(rpcServer, channel, lastContact); + } + + @Override + public void processRequest(ByteBuff buf) throws IOException, InterruptedException { + // this will throw exception after the connection header is read, and an RPC is sent + // from client + throw new DoNotRetryIOException("Failing for test"); + } + } + + @Override + protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { + return new FailingConnection(this, channel, time); + } + } + + private static class FailingNettyRpcServer extends NettyRpcServer { + + FailingNettyRpcServer(Server server, String name, + List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { + super(server, name, services, bindAddress, conf, scheduler, true); + } + + static final class FailingConnection extends NettyServerRpcConnection { + private FailingConnection(FailingNettyRpcServer rpcServer, Channel channel) { + super(rpcServer, channel); + } + + @Override + public void processRequest(ByteBuff buf) throws IOException, InterruptedException { + // this will throw exception after the connection header is read, and an RPC is sent + // from client + throw new DoNotRetryIOException("Failing for test"); + } + } + + @Override + protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { + return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) { + @Override + protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { + return new FailingConnection(FailingNettyRpcServer.this, channel); + } + }; + } + } + + private RpcServer createTestFailingRpcServer(final String name, final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException; + Configuration conf, RpcScheduler scheduler) throws IOException { + if (rpcServerImpl.equals(NettyRpcServer.class)) { + return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler); + } else { + return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler); + } + } /** Tests that the connection closing is handled by the client with outstanding RPC calls */ @Test public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { Configuration conf = new Configuration(CONF); - RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer", + RpcServer rpcServer = createTestFailingRpcServer("testRpcServer", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); @@ -543,19 +619,33 @@ public abstract class AbstractTestIPC { protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf); + private IOException doBadPreableHeaderCall(BlockingInterface stub) { + ServiceException se = assertThrows(ServiceException.class, + () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build())); + return ProtobufUtil.handleRemoteException(se); + } + @Test - public void testBadPreambleHeader() throws IOException, ServiceException { + public void testBadPreambleHeader() throws Exception { Configuration clientConf = new Configuration(CONF); RpcServer rpcServer = createRpcServer(null, "testRpcServer", Collections.emptyList(), new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); - ServiceException se = assertThrows(ServiceException.class, - () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build())); - IOException ioe = ProtobufUtil.handleRemoteException(se); - assertThat(ioe, instanceOf(BadAuthException.class)); - assertThat(ioe.getMessage(), containsString("authName=unknown")); + BadAuthException error = null; + // for SimpleRpcServer, it is possible that we get a broken pipe before getting the + // BadAuthException, so we add some retries here, see HBASE-28417 + for (int i = 0; i < 10; i++) { + IOException ioe = doBadPreableHeaderCall(stub); + if (ioe instanceof BadAuthException) { + error = (BadAuthException) ioe; + break; + } + Thread.sleep(100); + } + assertNotNull("Can not get expected BadAuthException", error); + assertThat(error.getMessage(), containsString("authName=unknown")); } finally { rpcServer.stop(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java index 9c9a6d5d608..e0ff6856571 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java @@ -18,20 +18,20 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.junit.ClassRule; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) @Category({ RPCTests.class, MediumTests.class }) public class TestBlockingIPC extends AbstractTestIPC { @@ -39,11 +39,10 @@ public class TestBlockingIPC extends AbstractTestIPC { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBlockingIPC.class); - @Override - protected RpcServer createRpcServer(Server server, String name, - List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); + @Parameters(name = "{index}: rpcServerImpl={0}") + public static List<Object[]> data() { + return Arrays.asList(new Object[] { SimpleRpcServer.class }, + new Object[] { NettyRpcServer.class }); } @Override @@ -73,41 +72,6 @@ public class TestBlockingIPC extends AbstractTestIPC { }; } - private static class TestFailingRpcServer extends SimpleRpcServer { - - TestFailingRpcServer(Server server, String name, - List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - super(server, name, services, bindAddress, conf, scheduler, true); - } - - final class FailingConnection extends SimpleServerRpcConnection { - private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel channel, - long lastContact) { - super(rpcServer, channel, lastContact); - } - - @Override - public void processRequest(ByteBuff buf) throws IOException, InterruptedException { - // this will throw exception after the connection header is read, and an RPC is sent - // from client - throw new DoNotRetryIOException("Failing for test"); - } - } - - @Override - protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { - return new FailingConnection(this, channel, time); - } - } - - @Override - protected RpcServer createTestFailingRpcServer(Server server, String name, - List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler); - } - @Override protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) { return new BlockingRpcClient(conf) { @@ -124,7 +88,6 @@ public class TestBlockingIPC extends AbstractTestIPC { } }; } - }; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java index 40fb61e23df..cece884fdb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -18,16 +18,11 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.util.JVM; @@ -40,7 +35,6 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; @@ -54,18 +48,27 @@ public class TestNettyIPC extends AbstractTestIPC { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestNettyIPC.class); - @Parameters(name = "{index}: EventLoop={0}") - public static Collection<Object[]> parameters() { - List<Object[]> params = new ArrayList<>(); - params.add(new Object[] { "nio" }); - params.add(new Object[] { "perClientNio" }); + private static List<String> getEventLoopTypes() { + List<String> types = new ArrayList<>(); + types.add("nio"); + types.add("perClientNio"); if (JVM.isLinux() && JVM.isAmd64()) { - params.add(new Object[] { "epoll" }); + types.add("epoll"); + } + return types; + } + + @Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}") + public static List<Object[]> parameters() { + List<Object[]> params = new ArrayList<>(); + for (String eventLoopType : getEventLoopTypes()) { + params.add(new Object[] { SimpleRpcServer.class, eventLoopType }); + params.add(new Object[] { NettyRpcServer.class, eventLoopType }); } return params; } - @Parameter + @Parameter(1) public String eventLoopType; private static NioEventLoopGroup NIO; @@ -106,13 +109,6 @@ public class TestNettyIPC extends AbstractTestIPC { } } - @Override - protected RpcServer createRpcServer(Server server, String name, - List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true); - } - @Override protected NettyRpcClient createRpcClientNoCodec(Configuration conf) { setConf(conf); @@ -144,46 +140,6 @@ public class TestNettyIPC extends AbstractTestIPC { }; } - private static class TestFailingRpcServer extends NettyRpcServer { - - TestFailingRpcServer(Server server, String name, - List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - super(server, name, services, bindAddress, conf, scheduler, true); - } - - static final class FailingConnection extends NettyServerRpcConnection { - private FailingConnection(TestFailingRpcServer rpcServer, Channel channel) { - super(rpcServer, channel); - } - - @Override - public void processRequest(ByteBuff buf) throws IOException, InterruptedException { - // this will throw exception after the connection header is read, and an RPC is sent - // from client - throw new DoNotRetryIOException("Failing for test"); - } - } - - @Override - protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { - return new NettyRpcServerPreambleHandler(TestFailingRpcServer.this) { - @Override - protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { - return new FailingConnection(TestFailingRpcServer.this, channel); - } - }; - } - } - - @Override - protected RpcServer createTestFailingRpcServer(Server server, String name, - List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException { - return new TestFailingRpcServer(server, name, services, bindAddress, conf, scheduler); - } - - @Override protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) { return new NettyRpcClient(conf) {