This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch hubspot-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 0c2146587884d676573ff0fc54629febd2d5906b
Author: Bryan Beaudreault <bbeaudrea...@hubspot.com>
AuthorDate: Tue Mar 15 12:11:10 2022 -0400

    Revert "Upstream Callers (#15)"
    
    See HBasePlanning/issues/806 for details
    
    This reverts commit 8cc6bfb5b22b2b16f2618f02b3aeb15d98d050f0.
---
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 15 +---
 .../java/org/apache/hadoop/hbase/ipc/Call.java     |  7 +-
 .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java  |  5 --
 .../apache/hadoop/hbase/ipc/UpstreamCaller.java    | 18 -----
 hbase-protocol-shaded/src/main/protobuf/RPC.proto  |  3 -
 hbase-protocol/src/main/protobuf/RPC.proto         |  3 -
 .../org/apache/hadoop/hbase/ipc/CallRunner.java    |  4 +-
 .../apache/hadoop/hbase/ipc/RpcCallContext.java    | 15 +---
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  8 ---
 .../apache/hadoop/hbase/ipc/AbstractTestIPC.java   | 84 ----------------------
 .../ipc/TestRpcServerSlowConnectionSetup.java      |  2 +-
 .../hbase/namequeues/TestNamedQueueRecorder.java   |  4 --
 .../store/region/TestRegionProcedureStore.java     |  4 --
 13 files changed, 8 insertions(+), 164 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 123a67ef258..a57672f02ed 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
@@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.PoolMap;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
@@ -128,8 +126,6 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
   protected final int readTO;
   protected final int writeTO;
 
-  protected final UpstreamCaller upstreamCaller;
-
   private final PoolMap<ConnectionId, T> connections;
 
   private final AtomicInteger callIdCnt = new AtomicInteger(0);
@@ -190,15 +186,6 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
       }
     }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
 
-    String className = conf.get(UpstreamCaller.HBASE_UPSTREAM_CALLER);
-    if (StringUtils.isEmpty(className)) {
-      LOG.info("No " + UpstreamCaller.HBASE_UPSTREAM_CALLER + " is set.");
-      this.upstreamCaller = UpstreamCaller.NONE;
-    } else {
-      this.upstreamCaller = 
ReflectionUtils.instantiateWithCustomCtor(className,
-        null, null);
-    }
-
     if (LOG.isDebugEnabled()) {
       LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", 
tcpKeepAlive="
           + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", 
connectTO=" + this.connectTO
@@ -420,7 +407,7 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
 
     final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
     Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), 
returnType,
-        hrc.getCallTimeout(), hrc.getPriority(), 
upstreamCaller.getUpstreamCaller().orElse(null), new RpcCallback<Call>() {
+        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
           @Override
           public void run(Call call) {
             counter.decrementAndGet();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index a8ad711b9f6..7793680ca54 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -55,18 +55,17 @@ class Call {
   final Descriptors.MethodDescriptor md;
   final int timeout; // timeout in millisecond for this call; 0 means infinite.
   final int priority;
-  final String upstreamCaller;
   final MetricsConnection.CallStats callStats;
   private final RpcCallback<Call> callback;
   final Span span;
   Timeout timeoutTask;
 
-  protected Call(int id, final Descriptors.MethodDescriptor md, Message param, 
final CellScanner cells, final Message responseDefaultType, int timeout, int 
priority,
-    String upstreamCaller, RpcCallback<Call> callback, 
MetricsConnection.CallStats callStats) {
+  protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
+      final CellScanner cells, final Message responseDefaultType, int timeout, 
int priority,
+      RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
     this.param = param;
     this.md = md;
     this.cells = cells;
-    this.upstreamCaller = upstreamCaller;
     this.callStats = callStats;
     this.callStats.setStartTime(EnvironmentEdgeManager.currentTime());
     this.responseDefaultType = responseDefaultType;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index de1735c7b95..11d150e0878 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -126,11 +126,6 @@ class IPCUtil {
     if (call.priority != HConstants.PRIORITY_UNSET) {
       builder.setPriority(call.priority);
     }
-
-    if (call.upstreamCaller != null) {
-      builder.setUpstreamCaller(call.upstreamCaller);
-    }
-
     builder.setTimeout(call.timeout);
 
     return builder.build();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UpstreamCaller.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UpstreamCaller.java
deleted file mode 100644
index 1f2b9332ac6..00000000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UpstreamCaller.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.Optional;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-@InterfaceAudience.LimitedPrivate("hubspot")
-@InterfaceStability.Unstable
-public interface UpstreamCaller {
-
-  UpstreamCaller NONE = new UpstreamCaller() {};
-
-  String HBASE_UPSTREAM_CALLER = "hbase.upstream.caller.impl";
-
-  default Optional<String> getUpstreamCaller() {
-    return Optional.empty();
-  }
-}
diff --git a/hbase-protocol-shaded/src/main/protobuf/RPC.proto 
b/hbase-protocol-shaded/src/main/protobuf/RPC.proto
index e05f5523999..1ccf6e84ee3 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RPC.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RPC.proto
@@ -146,9 +146,6 @@ message RequestHeader {
   // See HConstants.
   optional uint32 priority = 6;
   optional uint32 timeout = 7;
-
-  // Name of upstream caller, ie. grpc or rest caller
-  optional string upstream_caller = 100;
 }
 
 message ResponseHeader {
diff --git a/hbase-protocol/src/main/protobuf/RPC.proto 
b/hbase-protocol/src/main/protobuf/RPC.proto
index 57511ca906d..25e051430e2 100644
--- a/hbase-protocol/src/main/protobuf/RPC.proto
+++ b/hbase-protocol/src/main/protobuf/RPC.proto
@@ -127,9 +127,6 @@ message RequestHeader {
   // See HConstants.
   optional uint32 priority = 6;
   optional uint32 timeout = 7;
-
-  // Name of upstream caller, ie. grpc or rest caller
-  optional string upstream_caller = 100;
 }
 
 message ResponseHeader {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index b206fcd3e2a..f5e12ddeca1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -110,9 +110,9 @@ public class CallRunner {
       this.status.setStatus("Setting up call");
       this.status.setConnection(call.getRemoteAddress().getHostAddress(), 
call.getRemotePort());
       if (RpcServer.LOG.isTraceEnabled()) {
-        Optional<String> requestUserName = call.getRequestUserName();
+        Optional<User> remoteUser = call.getRequestUser();
         RpcServer.LOG.trace(call.toShortString() + " executing as " +
-            (requestUserName.orElse("NULL principal")));
+            (remoteUser.isPresent() ? remoteUser.get().getName() : "NULL 
principal"));
       }
       Throwable errorThrowable = null;
       String error = null;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index f65bb4a4ce0..6a4d3a29a52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -55,24 +55,11 @@ public interface RpcCallContext {
    */
   Optional<User> getRequestUser();
 
-  /**
-   * When an HBase client is used as a proxy for connecting to HBase, the
-   * {@link #getRequestUser()} will be the name of the proxy. This will be
-   * the name of the client who called the proxy.
-   * @return The upstream caller for this call
-   */
-  Optional<String> getUpstreamCaller();
-
   /**
    * @return Current request's user name or not present if none ongoing.
    */
   default Optional<String> getRequestUserName() {
-    return getRequestUser()
-      .map(User::getShortName)
-      .map(userName -> getUpstreamCaller()
-        .map(upstreamCaller -> upstreamCaller + ".via." + userName)
-        .orElse(userName)
-      );
+    return getRequestUser().map(User::getShortName);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index 50dd0e1987e..9a4c11ad404 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -202,14 +202,6 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     return this.header.getPriority();
   }
 
-  @Override
-  public Optional<String> getUpstreamCaller() {
-    if (this.header.hasUpstreamCaller()) {
-      return Optional.of(this.header.getUpstreamCaller());
-    }
-    return Optional.empty();
-  }
-
   /*
    * Short string representation without param info because param itself could 
be huge depends on
    * the payload of a command
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 3c166ba7e72..87561bac745 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -36,8 +35,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -51,7 +48,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,86 +154,6 @@ public abstract class AbstractTestIPC {
   protected abstract AbstractRpcClient<?> 
createRpcClientRTEDuringConnectionSetup(
       Configuration conf) throws IOException;
 
-  @Test
-  public void testDefaultUpstreamCallerPropagation() throws Exception {
-    UpstreamCallerExtractingRpcScheduler scheduler = new 
UpstreamCallerExtractingRpcScheduler(CONF, 1);
-    Configuration conf = HBaseConfiguration.create();
-    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
-      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
-        SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
-      scheduler);
-
-    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
-      rpcServer.start();
-      BlockingInterface stub = newBlockingStub(client, 
rpcServer.getListenerAddress());
-      stub.echo(null, 
EchoRequestProto.newBuilder().setMessage("hello").build());
-
-      RpcCall rpcCall = scheduler.getCall().get();
-
-      assertNotNull(rpcCall);
-      assertFalse(rpcCall.getUpstreamCaller().isPresent());
-      assertTrue(rpcCall.getRequestUserName().isPresent());
-      assertFalse(rpcCall.getRequestUserName().get().contains(".via."));
-    } finally {
-      rpcServer.stop();
-    }
-  }
-
-  @Test
-  public void testCustomUpstreamCallerPropagation() throws Exception {
-    UpstreamCallerExtractingRpcScheduler scheduler = new 
UpstreamCallerExtractingRpcScheduler(CONF, 1);
-    Configuration conf = HBaseConfiguration.create();
-    conf.set(UpstreamCaller.HBASE_UPSTREAM_CALLER, 
TestUpstreamCaller.class.getName());
-
-    RpcServer rpcServer = createRpcServer(null, "testRpcServer",
-      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
-        SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
-      scheduler);
-
-    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
-      rpcServer.start();
-      BlockingInterface stub = newBlockingStub(client, 
rpcServer.getListenerAddress());
-      stub.echo(null, 
EchoRequestProto.newBuilder().setMessage("hello").build());
-
-      RpcCall rpcCall = scheduler.getCall().get();
-
-      assertNotNull(rpcCall);
-      assertTrue(rpcCall.getUpstreamCaller().isPresent());
-      assertEquals("test", rpcCall.getUpstreamCaller().get());
-      assertTrue(rpcCall.getRequestUserName().isPresent());
-      
assertTrue(rpcCall.getRequestUserName().get().matches("test\\.via\\..+"));
-    } finally {
-      rpcServer.stop();
-    }
-  }
-
-  public static class UpstreamCallerExtractingRpcScheduler extends 
FifoRpcScheduler {
-
-    private AtomicReference<RpcCall> call = new AtomicReference<>(null);
-
-    public UpstreamCallerExtractingRpcScheduler(Configuration conf, int 
handlerCount) {
-      super(conf, handlerCount);
-    }
-
-    @Override
-    public boolean dispatch(CallRunner task) throws IOException, 
InterruptedException {
-      call.set(task.getRpcCall());
-      return super.dispatch(task);
-    }
-
-    public AtomicReference<RpcCall> getCall() {
-      return call;
-    }
-  }
-
-  public static class TestUpstreamCaller implements UpstreamCaller {
-
-    @Override
-    public Optional<String> getUpstreamCaller() {
-      return Optional.of("test");
-    }
-  }
-
   @Test
   public void testRTEDuringConnectionSetup() throws Exception {
     Configuration conf = HBaseConfiguration.create();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
index 1cd2c0c8c37..aedf57e72f0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
@@ -124,7 +124,7 @@ public class TestRpcServerSlowConnectionSetup {
     int callId = 10;
     Call call = new Call(callId, 
TestProtobufRpcProto.getDescriptor().findMethodByName("ping"),
         EmptyRequestProto.getDefaultInstance(), null, 
EmptyResponseProto.getDefaultInstance(), 1000,
-        HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats());
+        HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats());
     RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null);
     dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, 
call.param));
     requestHeader.writeDelimitedTo(dos);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
index 2740cc3ad6a..161bcc11a20 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
@@ -705,10 +705,6 @@ public class TestNamedQueueRecorder {
         return getUser(userName);
       }
 
-      @Override public Optional<String> getUpstreamCaller() {
-        return Optional.empty();
-      }
-
       @Override
       public InetAddress getRemoteAddress() {
         return null;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
index c29b0da12c8..d7a0ce76c9e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -258,10 +258,6 @@ public class TestRegionProcedureStore extends 
RegionProcedureStoreTestBase {
         return Optional.empty();
       }
 
-      @Override public Optional<String> getUpstreamCaller() {
-        return Optional.empty();
-      }
-
       @Override
       public InetAddress getRemoteAddress() {
         return null;

Reply via email to