This is an automated email from the ASF dual-hosted git repository.
meszibalu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new e0c7b80e565 HBASE-29765 Make client connection header attributes
configurable (#7553)
e0c7b80e565 is described below
commit e0c7b80e565cd6410f37eb81a475b3e811af82fe
Author: Balazs Meszaros <[email protected]>
AuthorDate: Mon Jan 12 16:38:20 2026 +0100
HBASE-29765 Make client connection header attributes configurable (#7553)
hbase.client.header.* configuration settings are passed to the server side
in RPC ConnectionHeader attributes.
Signed-off-by: Dávid Paksy <[email protected]>
---
.../org/apache/hadoop/hbase/ipc/RpcConnection.java | 27 +++++
.../java/org/apache/hadoop/hbase/HConstants.java | 6 +
.../hadoop/hbase/ipc/TestRpcConnectionHeader.java | 134 +++++++++++++++++++++
3 files changed, 167 insertions(+)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index dbdb0e2037f..bee5719d455 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.security.SecurityInfo;
import org.apache.hadoop.hbase.security.User;
import
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -193,6 +195,25 @@ abstract class RpcConnection {
return preamble;
}
+ private Map<String, byte[]> getConfigurationConnectionAttributes() {
+ Map<String, byte[]> attributes = new HashMap<>();
+
+ for (Map.Entry<String, String> entry : conf) {
+ String key = entry.getKey();
+
+ if (
+ key.startsWith(HConstants.CLIENT_HEADER_PREFIX) &&
!connectionAttributes.containsKey(key)
+ ) {
+ String value = entry.getValue();
+
+ LOG.debug("Adding connection header: {}={}", key, value);
+ attributes.put(key, Bytes.toBytes(value));
+ }
+ }
+
+ return attributes;
+ }
+
protected final ConnectionHeader getConnectionHeader() {
final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setServiceName(remoteId.getServiceName());
@@ -214,6 +235,12 @@ abstract class RpcConnection {
builder.addAttribute(attributeBuilder.build());
}
}
+ for (Map.Entry<String, byte[]> attribute :
getConfigurationConnectionAttributes().entrySet()) {
+ HBaseProtos.NameBytesPair.Builder attributeBuilder =
HBaseProtos.NameBytesPair.newBuilder();
+ attributeBuilder.setName(attribute.getKey());
+
attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
+ builder.addAttribute(attributeBuilder.build());
+ }
builder.setVersionInfo(ProtobufUtil.getVersionInfo());
boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY,
CRYPTO_AES_ENABLED_DEFAULT);
// if Crypto AES enable, setup Cipher transformation
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1051686d32e..9af711e7edf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1503,6 +1503,12 @@ public final class HConstants {
public static final String ZK_SERVER_KERBEROS_PRINCIPAL =
"hbase.zookeeper.server.kerberos.principal";
+ /**
+ * Configuration entries with this prefix are passed to the RPC
ConnectionHeader and become
+ * accessible on the server side.
+ */
+ public static final String CLIENT_HEADER_PREFIX = "hbase.client.header.";
+
/** Config key for hbase temporary directory in hdfs */
public static final String TEMPORARY_FS_DIRECTORY_KEY = "hbase.fs.tmp.dir";
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcConnectionHeader.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcConnectionHeader.java
new file mode 100644
index 00000000000..512501be82a
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcConnectionHeader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+
+@Category(SmallTests.class)
+public class TestRpcConnectionHeader {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRpcClientLeaks.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private final ConnectionId connectionId;
+
+ public TestRpcConnectionHeader() throws IOException {
+ User user = User.createUserForTesting(HBaseConfiguration.create(), "test",
new String[] {});
+ String serviceName = MasterService.getDescriptor().getName();
+
+ connectionId = new ConnectionId(user, serviceName,
Address.fromParts("localhost", 12345));
+ }
+
+ private class MyRpcConnection extends RpcConnection {
+ protected MyRpcConnection(Configuration conf, Map<String, byte[]>
connectionAttributes)
+ throws IOException {
+ super(conf, null, connectionId, "cluster-id", false, null, null, null,
null,
+ connectionAttributes);
+ }
+
+ @Override
+ protected void callTimeout(Call call) {
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public void sendRequest(Call call, HBaseRpcController hrc) throws
IOException {
+ }
+
+ @Override
+ public void cleanupConnection() {
+ }
+ }
+
+ @Test
+ public void testEmptyHeaders() throws IOException {
+ Configuration configuration = HBaseConfiguration.create();
+
+ MyRpcConnection connection = new MyRpcConnection(configuration, Map.of());
+ RPCProtos.ConnectionHeader connectionHeader =
connection.getConnectionHeader();
+
+ assertEquals(0, connectionHeader.getAttributeCount());
+ }
+
+ @Test
+ public void testConfigHeaders() throws IOException {
+ Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.client.header.test", "true");
+
+ MyRpcConnection connection = new MyRpcConnection(configuration, Map.of());
+ RPCProtos.ConnectionHeader connectionHeader =
connection.getConnectionHeader();
+
+ assertEquals(1, connectionHeader.getAttributeCount());
+
+ HBaseProtos.NameBytesPair attribute = connectionHeader.getAttribute(0);
+ assertEquals("hbase.client.header.test", attribute.getName());
+ assertEquals("true", attribute.getValue().toStringUtf8());
+ }
+
+ @Test
+ public void testConfigHeadersNoOverride() throws IOException {
+ Configuration configuration = HBaseConfiguration.create();
+ configuration.set("hbase.client.header.test", "true");
+ configuration.set("hbase.client.header.test2", "true");
+
+ Map<String, byte[]> attributes = Map.of("hbase.client.header.test",
Bytes.toBytes("false"));
+
+ MyRpcConnection connection = new MyRpcConnection(configuration,
attributes);
+ RPCProtos.ConnectionHeader connectionHeader =
connection.getConnectionHeader();
+
+ assertEquals(2, connectionHeader.getAttributeCount());
+
+ HBaseProtos.NameBytesPair attribute0 = connectionHeader.getAttribute(0);
+ assertEquals("hbase.client.header.test", attribute0.getName());
+ assertEquals("false", attribute0.getValue().toStringUtf8());
+
+ HBaseProtos.NameBytesPair attribute1 = connectionHeader.getAttribute(1);
+ assertEquals("hbase.client.header.test2", attribute1.getName());
+ assertEquals("true", attribute1.getValue().toStringUtf8());
+ }
+}