This is an automated email from the ASF dual-hosted git repository.

meszibalu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new e0c7b80e565 HBASE-29765 Make client connection header attributes 
configurable (#7553)
e0c7b80e565 is described below

commit e0c7b80e565cd6410f37eb81a475b3e811af82fe
Author: Balazs Meszaros <[email protected]>
AuthorDate: Mon Jan 12 16:38:20 2026 +0100

    HBASE-29765 Make client connection header attributes configurable (#7553)
    
    hbase.client.header.* configuration settings are passed to the server side 
in RPC ConnectionHeader attributes.
    
    Signed-off-by: Dávid Paksy <[email protected]>
---
 .../org/apache/hadoop/hbase/ipc/RpcConnection.java |  27 +++++
 .../java/org/apache/hadoop/hbase/HConstants.java   |   6 +
 .../hadoop/hbase/ipc/TestRpcConnectionHeader.java  | 134 +++++++++++++++++++++
 3 files changed, 167 insertions(+)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index dbdb0e2037f..bee5719d455 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.security.SecurityInfo;
 import org.apache.hadoop.hbase.security.User;
 import 
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
 import 
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -193,6 +195,25 @@ abstract class RpcConnection {
     return preamble;
   }
 
+  private Map<String, byte[]> getConfigurationConnectionAttributes() {
+    Map<String, byte[]> attributes = new HashMap<>();
+
+    for (Map.Entry<String, String> entry : conf) {
+      String key = entry.getKey();
+
+      if (
+        key.startsWith(HConstants.CLIENT_HEADER_PREFIX) && 
!connectionAttributes.containsKey(key)
+      ) {
+        String value = entry.getValue();
+
+        LOG.debug("Adding connection header: {}={}", key, value);
+        attributes.put(key, Bytes.toBytes(value));
+      }
+    }
+
+    return attributes;
+  }
+
   protected final ConnectionHeader getConnectionHeader() {
     final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
     builder.setServiceName(remoteId.getServiceName());
@@ -214,6 +235,12 @@ abstract class RpcConnection {
         builder.addAttribute(attributeBuilder.build());
       }
     }
+    for (Map.Entry<String, byte[]> attribute : 
getConfigurationConnectionAttributes().entrySet()) {
+      HBaseProtos.NameBytesPair.Builder attributeBuilder = 
HBaseProtos.NameBytesPair.newBuilder();
+      attributeBuilder.setName(attribute.getKey());
+      
attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
+      builder.addAttribute(attributeBuilder.build());
+    }
     builder.setVersionInfo(ProtobufUtil.getVersionInfo());
     boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, 
CRYPTO_AES_ENABLED_DEFAULT);
     // if Crypto AES enable, setup Cipher transformation
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1051686d32e..9af711e7edf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1503,6 +1503,12 @@ public final class HConstants {
   public static final String ZK_SERVER_KERBEROS_PRINCIPAL =
     "hbase.zookeeper.server.kerberos.principal";
 
+  /**
+   * Configuration entries with this prefix are passed to the RPC 
ConnectionHeader and become
+   * accessible on the server side.
+   */
+  public static final String CLIENT_HEADER_PREFIX = "hbase.client.header.";
+
   /** Config key for hbase temporary directory in hdfs */
   public static final String TEMPORARY_FS_DIRECTORY_KEY = "hbase.fs.tmp.dir";
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcConnectionHeader.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcConnectionHeader.java
new file mode 100644
index 00000000000..512501be82a
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcConnectionHeader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+
+@Category(SmallTests.class)
+public class TestRpcConnectionHeader {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRpcClientLeaks.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private final ConnectionId connectionId;
+
+  public TestRpcConnectionHeader() throws IOException {
+    User user = User.createUserForTesting(HBaseConfiguration.create(), "test", 
new String[] {});
+    String serviceName = MasterService.getDescriptor().getName();
+
+    connectionId = new ConnectionId(user, serviceName, 
Address.fromParts("localhost", 12345));
+  }
+
+  private class MyRpcConnection extends RpcConnection {
+    protected MyRpcConnection(Configuration conf, Map<String, byte[]> 
connectionAttributes)
+      throws IOException {
+      super(conf, null, connectionId, "cluster-id", false, null, null, null, 
null,
+        connectionAttributes);
+    }
+
+    @Override
+    protected void callTimeout(Call call) {
+    }
+
+    @Override
+    public boolean isActive() {
+      return false;
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public void sendRequest(Call call, HBaseRpcController hrc) throws 
IOException {
+    }
+
+    @Override
+    public void cleanupConnection() {
+    }
+  }
+
+  @Test
+  public void testEmptyHeaders() throws IOException {
+    Configuration configuration = HBaseConfiguration.create();
+
+    MyRpcConnection connection = new MyRpcConnection(configuration, Map.of());
+    RPCProtos.ConnectionHeader connectionHeader = 
connection.getConnectionHeader();
+
+    assertEquals(0, connectionHeader.getAttributeCount());
+  }
+
+  @Test
+  public void testConfigHeaders() throws IOException {
+    Configuration configuration = HBaseConfiguration.create();
+    configuration.set("hbase.client.header.test", "true");
+
+    MyRpcConnection connection = new MyRpcConnection(configuration, Map.of());
+    RPCProtos.ConnectionHeader connectionHeader = 
connection.getConnectionHeader();
+
+    assertEquals(1, connectionHeader.getAttributeCount());
+
+    HBaseProtos.NameBytesPair attribute = connectionHeader.getAttribute(0);
+    assertEquals("hbase.client.header.test", attribute.getName());
+    assertEquals("true", attribute.getValue().toStringUtf8());
+  }
+
+  @Test
+  public void testConfigHeadersNoOverride() throws IOException {
+    Configuration configuration = HBaseConfiguration.create();
+    configuration.set("hbase.client.header.test", "true");
+    configuration.set("hbase.client.header.test2", "true");
+
+    Map<String, byte[]> attributes = Map.of("hbase.client.header.test", 
Bytes.toBytes("false"));
+
+    MyRpcConnection connection = new MyRpcConnection(configuration, 
attributes);
+    RPCProtos.ConnectionHeader connectionHeader = 
connection.getConnectionHeader();
+
+    assertEquals(2, connectionHeader.getAttributeCount());
+
+    HBaseProtos.NameBytesPair attribute0 = connectionHeader.getAttribute(0);
+    assertEquals("hbase.client.header.test", attribute0.getName());
+    assertEquals("false", attribute0.getValue().toStringUtf8());
+
+    HBaseProtos.NameBytesPair attribute1 = connectionHeader.getAttribute(1);
+    assertEquals("hbase.client.header.test2", attribute1.getName());
+    assertEquals("true", attribute1.getValue().toStringUtf8());
+  }
+}

Reply via email to