This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new bc0c12d5fd1 HBASE-28417 TestBlockingIPC.testBadPreambleHeader 
sometimes fails with broken pipe instead of bad auth (#5740)
bc0c12d5fd1 is described below

commit bc0c12d5fd19ca5cba84d2900b24505f1ff91dfc
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Wed Mar 6 16:08:36 2024 +0800

    HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails with 
broken pipe instead of bad auth (#5740)
    
    Also change the IPC related tests to test different combinations of rpc 
server&client, for example, NettyRpcClient and SimpleRpcServer
    
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
    (cherry picked from commit 2306820df8b41d9af5227465ee2cf9e18b8f0b5c)
---
 .../apache/hadoop/hbase/ipc/AbstractTestIPC.java   | 124 ++++++++++++++++++---
 .../apache/hadoop/hbase/ipc/TestBlockingIPC.java   |  55 ++-------
 .../org/apache/hadoop/hbase/ipc/TestNettyIPC.java  |  76 +++----------
 3 files changed, 132 insertions(+), 123 deletions(-)

diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 1f26cc8a0b2..eb158d59fc8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -33,7 +33,6 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -53,6 +52,7 @@ import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -68,18 +68,22 @@ import org.apache.hadoop.hbase.MatcherPredicate;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.StringUtils;
 import org.hamcrest.Matcher;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 
 import 
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import 
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -101,20 +105,26 @@ public abstract class AbstractTestIPC {
   private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, 
CELL_BYTES, CELL_BYTES);
 
   protected static final Configuration CONF = HBaseConfiguration.create();
-  static {
-    // Set the default to be the old SimpleRpcServer. Subclasses test it and 
netty.
-    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, 
SimpleRpcServer.class.getName());
-  }
 
-  protected abstract RpcServer createRpcServer(final Server server, final 
String name,
-    final List<BlockingServiceAndInterface> services, final InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException;
+  private RpcServer createRpcServer(Server server, String name,
+    List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 
Configuration conf,
+    RpcScheduler scheduler) throws IOException {
+    return RpcServerFactory.createRpcServer(server, name, services, 
bindAddress, conf, scheduler);
+  }
 
   protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration 
conf);
 
   @Rule
   public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
 
+  @Parameter(0)
+  public Class<? extends RpcServer> rpcServerImpl;
+
+  @Before
+  public void setUpBeforeTest() {
+    CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, 
rpcServerImpl, RpcServer.class);
+  }
+
   /**
    * Ensure we do not HAVE TO HAVE a codec.
    */
@@ -326,15 +336,81 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  protected abstract RpcServer createTestFailingRpcServer(final Server server, 
final String name,
+  private static class FailingSimpleRpcServer extends SimpleRpcServer {
+
+    FailingSimpleRpcServer(Server server, String name,
+      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
+      Configuration conf, RpcScheduler scheduler) throws IOException {
+      super(server, name, services, bindAddress, conf, scheduler, true);
+    }
+
+    final class FailingConnection extends SimpleServerRpcConnection {
+      private FailingConnection(FailingSimpleRpcServer rpcServer, 
SocketChannel channel,
+        long lastContact) {
+        super(rpcServer, channel, lastContact);
+      }
+
+      @Override
+      public void processRequest(ByteBuff buf) throws IOException, 
InterruptedException {
+        // this will throw exception after the connection header is read, and 
an RPC is sent
+        // from client
+        throw new DoNotRetryIOException("Failing for test");
+      }
+    }
+
+    @Override
+    protected SimpleServerRpcConnection getConnection(SocketChannel channel, 
long time) {
+      return new FailingConnection(this, channel, time);
+    }
+  }
+
+  private static class FailingNettyRpcServer extends NettyRpcServer {
+
+    FailingNettyRpcServer(Server server, String name,
+      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
+      Configuration conf, RpcScheduler scheduler) throws IOException {
+      super(server, name, services, bindAddress, conf, scheduler, true);
+    }
+
+    static final class FailingConnection extends NettyServerRpcConnection {
+      private FailingConnection(FailingNettyRpcServer rpcServer, Channel 
channel) {
+        super(rpcServer, channel);
+      }
+
+      @Override
+      public void processRequest(ByteBuff buf) throws IOException, 
InterruptedException {
+        // this will throw exception after the connection header is read, and 
an RPC is sent
+        // from client
+        throw new DoNotRetryIOException("Failing for test");
+      }
+    }
+
+    @Override
+    protected NettyRpcServerPreambleHandler 
createNettyRpcServerPreambleHandler() {
+      return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) {
+        @Override
+        protected NettyServerRpcConnection 
createNettyServerRpcConnection(Channel channel) {
+          return new FailingConnection(FailingNettyRpcServer.this, channel);
+        }
+      };
+    }
+  }
+
+  private RpcServer createTestFailingRpcServer(final String name,
     final List<BlockingServiceAndInterface> services, final InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException;
+    Configuration conf, RpcScheduler scheduler) throws IOException {
+    if (rpcServerImpl.equals(NettyRpcServer.class)) {
+      return new FailingNettyRpcServer(null, name, services, bindAddress, 
conf, scheduler);
+    } else {
+      return new FailingSimpleRpcServer(null, name, services, bindAddress, 
conf, scheduler);
+    }
+  }
 
   /** Tests that the connection closing is handled by the client with 
outstanding RPC calls */
   @Test
   public void testConnectionCloseWithOutstandingRPCs() throws 
InterruptedException, IOException {
     Configuration conf = new Configuration(CONF);
-    RpcServer rpcServer = createTestFailingRpcServer(null, "testRpcServer",
+    RpcServer rpcServer = createTestFailingRpcServer("testRpcServer",
       Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, 
null)),
       new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 
1));
 
@@ -543,19 +619,33 @@ public abstract class AbstractTestIPC {
 
   protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration 
conf);
 
+  private IOException doBadPreableHeaderCall(BlockingInterface stub) {
+    ServiceException se = assertThrows(ServiceException.class,
+      () -> stub.echo(null, 
EchoRequestProto.newBuilder().setMessage("hello").build()));
+    return ProtobufUtil.handleRemoteException(se);
+  }
+
   @Test
-  public void testBadPreambleHeader() throws IOException, ServiceException {
+  public void testBadPreambleHeader() throws Exception {
     Configuration clientConf = new Configuration(CONF);
     RpcServer rpcServer = createRpcServer(null, "testRpcServer", 
Collections.emptyList(),
       new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 
1));
     try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, 
rpcServer.getListenerAddress());
-      ServiceException se = assertThrows(ServiceException.class,
-        () -> stub.echo(null, 
EchoRequestProto.newBuilder().setMessage("hello").build()));
-      IOException ioe = ProtobufUtil.handleRemoteException(se);
-      assertThat(ioe, instanceOf(BadAuthException.class));
-      assertThat(ioe.getMessage(), containsString("authName=unknown"));
+      BadAuthException error = null;
+      // for SimpleRpcServer, it is possible that we get a broken pipe before 
getting the
+      // BadAuthException, so we add some retries here, see HBASE-28417
+      for (int i = 0; i < 10; i++) {
+        IOException ioe = doBadPreableHeaderCall(stub);
+        if (ioe instanceof BadAuthException) {
+          error = (BadAuthException) ioe;
+          break;
+        }
+        Thread.sleep(100);
+      }
+      assertNotNull("Can not get expected BadAuthException", error);
+      assertThat(error.getMessage(), containsString("authName=unknown"));
     } finally {
       rpcServer.stop();
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
index 9c9a6d5d608..e0ff6856571 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
@@ -18,20 +18,20 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 @Category({ RPCTests.class, MediumTests.class })
 public class TestBlockingIPC extends AbstractTestIPC {
 
@@ -39,11 +39,10 @@ public class TestBlockingIPC extends AbstractTestIPC {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestBlockingIPC.class);
 
-  @Override
-  protected RpcServer createRpcServer(Server server, String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return RpcServerFactory.createRpcServer(server, name, services, 
bindAddress, conf, scheduler);
+  @Parameters(name = "{index}: rpcServerImpl={0}")
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[] { SimpleRpcServer.class },
+      new Object[] { NettyRpcServer.class });
   }
 
   @Override
@@ -73,41 +72,6 @@ public class TestBlockingIPC extends AbstractTestIPC {
     };
   }
 
-  private static class TestFailingRpcServer extends SimpleRpcServer {
-
-    TestFailingRpcServer(Server server, String name,
-      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-      Configuration conf, RpcScheduler scheduler) throws IOException {
-      super(server, name, services, bindAddress, conf, scheduler, true);
-    }
-
-    final class FailingConnection extends SimpleServerRpcConnection {
-      private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel 
channel,
-        long lastContact) {
-        super(rpcServer, channel, lastContact);
-      }
-
-      @Override
-      public void processRequest(ByteBuff buf) throws IOException, 
InterruptedException {
-        // this will throw exception after the connection header is read, and 
an RPC is sent
-        // from client
-        throw new DoNotRetryIOException("Failing for test");
-      }
-    }
-
-    @Override
-    protected SimpleServerRpcConnection getConnection(SocketChannel channel, 
long time) {
-      return new FailingConnection(this, channel, time);
-    }
-  }
-
-  @Override
-  protected RpcServer createTestFailingRpcServer(Server server, String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new TestFailingRpcServer(server, name, services, bindAddress, conf, 
scheduler);
-  }
-
   @Override
   protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
     return new BlockingRpcClient(conf) {
@@ -124,7 +88,6 @@ public class TestBlockingIPC extends AbstractTestIPC {
           }
         };
       }
-
     };
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
index 40fb61e23df..cece884fdb0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
@@ -18,16 +18,11 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.util.JVM;
@@ -40,7 +35,6 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
-import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
 import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
@@ -54,18 +48,27 @@ public class TestNettyIPC extends AbstractTestIPC {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestNettyIPC.class);
 
-  @Parameters(name = "{index}: EventLoop={0}")
-  public static Collection<Object[]> parameters() {
-    List<Object[]> params = new ArrayList<>();
-    params.add(new Object[] { "nio" });
-    params.add(new Object[] { "perClientNio" });
+  private static List<String> getEventLoopTypes() {
+    List<String> types = new ArrayList<>();
+    types.add("nio");
+    types.add("perClientNio");
     if (JVM.isLinux() && JVM.isAmd64()) {
-      params.add(new Object[] { "epoll" });
+      types.add("epoll");
+    }
+    return types;
+  }
+
+  @Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}")
+  public static List<Object[]> parameters() {
+    List<Object[]> params = new ArrayList<>();
+    for (String eventLoopType : getEventLoopTypes()) {
+      params.add(new Object[] { SimpleRpcServer.class, eventLoopType });
+      params.add(new Object[] { NettyRpcServer.class, eventLoopType });
     }
     return params;
   }
 
-  @Parameter
+  @Parameter(1)
   public String eventLoopType;
 
   private static NioEventLoopGroup NIO;
@@ -106,13 +109,6 @@ public class TestNettyIPC extends AbstractTestIPC {
     }
   }
 
-  @Override
-  protected RpcServer createRpcServer(Server server, String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new NettyRpcServer(server, name, services, bindAddress, conf, 
scheduler, true);
-  }
-
   @Override
   protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
     setConf(conf);
@@ -144,46 +140,6 @@ public class TestNettyIPC extends AbstractTestIPC {
     };
   }
 
-  private static class TestFailingRpcServer extends NettyRpcServer {
-
-    TestFailingRpcServer(Server server, String name,
-      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-      Configuration conf, RpcScheduler scheduler) throws IOException {
-      super(server, name, services, bindAddress, conf, scheduler, true);
-    }
-
-    static final class FailingConnection extends NettyServerRpcConnection {
-      private FailingConnection(TestFailingRpcServer rpcServer, Channel 
channel) {
-        super(rpcServer, channel);
-      }
-
-      @Override
-      public void processRequest(ByteBuff buf) throws IOException, 
InterruptedException {
-        // this will throw exception after the connection header is read, and 
an RPC is sent
-        // from client
-        throw new DoNotRetryIOException("Failing for test");
-      }
-    }
-
-    @Override
-    protected NettyRpcServerPreambleHandler 
createNettyRpcServerPreambleHandler() {
-      return new NettyRpcServerPreambleHandler(TestFailingRpcServer.this) {
-        @Override
-        protected NettyServerRpcConnection 
createNettyServerRpcConnection(Channel channel) {
-          return new FailingConnection(TestFailingRpcServer.this, channel);
-        }
-      };
-    }
-  }
-
-  @Override
-  protected RpcServer createTestFailingRpcServer(Server server, String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new TestFailingRpcServer(server, name, services, bindAddress, conf, 
scheduler);
-  }
-
-  @Override
   protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
     return new NettyRpcClient(conf) {
 

Reply via email to