This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e16e719cd [HUDI-3980] Suport kerberos hbase index (#5464)
6e16e719cd is described below

commit 6e16e719cd614329018cd34a7c57d342fe2fa376
Author: xi chaomin <[email protected]>
AuthorDate: Sat May 14 19:37:31 2022 +0800

    [HUDI-3980] Suport kerberos hbase index (#5464)
    
    - Add configurations in HoodieHBaseIndexConfig.java to support kerberos 
hbase connection.
    
    Co-authored-by: xicm <[email protected]>
---
 .../apache/hudi/config/HoodieHBaseIndexConfig.java | 52 ++++++++++++++++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 20 +++++++++
 .../hudi/index/hbase/SparkHoodieHBaseIndex.java    | 27 +++++++++--
 3 files changed, 96 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
index 3d7e3a7941..2389aa7fc1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
@@ -157,6 +157,33 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
       .withDocumentation("When set to true, the rollback method will delete 
the last failed task index. "
           + "The default value is false. Because deleting the index will add 
extra load on the Hbase cluster for each rollback");
 
+  public static final ConfigProperty<String> SECURITY_AUTHENTICATION = 
ConfigProperty
+      .key("hoodie.index.hbase.security.authentication")
+      .defaultValue("simple")
+      .withDocumentation("Property to decide if the hbase cluster secure 
authentication is enabled or not. "
+          + "Possible values are 'simple' (no authentication), and 
'kerberos'.");
+
+  public static final ConfigProperty<String> KERBEROS_USER_KEYTAB = 
ConfigProperty
+      .key("hoodie.index.hbase.kerberos.user.keytab")
+      .noDefaultValue()
+      .withDocumentation("File name of the kerberos keytab file for connecting 
to the hbase cluster.");
+
+  public static final ConfigProperty<String> KERBEROS_USER_PRINCIPAL = 
ConfigProperty
+      .key("hoodie.index.hbase.kerberos.user.principal")
+      .noDefaultValue()
+      .withDocumentation("The kerberos principal name for connecting to the 
hbase cluster.");
+
+  public static final ConfigProperty<String> REGIONSERVER_PRINCIPAL = 
ConfigProperty
+      .key("hoodie.index.hbase.regionserver.kerberos.principal")
+      .noDefaultValue()
+      .withDocumentation("The value of hbase.regionserver.kerberos.principal 
in hbase cluster.");
+
+  public static final ConfigProperty<String> MASTER_PRINCIPAL = ConfigProperty
+      .key("hoodie.index.hbase.master.kerberos.principal")
+      .noDefaultValue()
+      .withDocumentation("The value of hbase.master.kerberos.principal in 
hbase cluster.");
+
+
   /**
    * @deprecated Use {@link #ZKQUORUM} and its methods instead
    */
@@ -444,6 +471,31 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder hbaseSecurityAuthentication(String authentication) {
+      hBaseIndexConfig.setValue(SECURITY_AUTHENTICATION, authentication);
+      return this;
+    }
+
+    public Builder hbaseKerberosUserKeytab(String keytab) {
+      hBaseIndexConfig.setValue(KERBEROS_USER_KEYTAB, keytab);
+      return this;
+    }
+
+    public Builder hbaseKerberosUserPrincipal(String principal) {
+      hBaseIndexConfig.setValue(KERBEROS_USER_PRINCIPAL, principal);
+      return this;
+    }
+
+    public Builder hbaseKerberosRegionserverPrincipal(String principal) {
+      hBaseIndexConfig.setValue(REGIONSERVER_PRINCIPAL, principal);
+      return this;
+    }
+
+    public Builder hbaseKerberosMasterPrincipal(String principal) {
+      hBaseIndexConfig.setValue(MASTER_PRINCIPAL, principal);
+      return this;
+    }
+
     /**
      * <p>
      * Method to set maximum QPS allowed per Region Server. This should be 
same across various jobs. This is intended to
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7b49a7a466..322c2e84e7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1488,6 +1488,26 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(HoodieHBaseIndexConfig.COMPUTE_QPS_DYNAMICALLY);
   }
 
+  public String getHBaseIndexSecurityAuthentication() {
+    return getString(HoodieHBaseIndexConfig.SECURITY_AUTHENTICATION);
+  }
+
+  public String getHBaseIndexKerberosUserKeytab() {
+    return getString(HoodieHBaseIndexConfig.KERBEROS_USER_KEYTAB);
+  }
+
+  public String getHBaseIndexKerberosUserPrincipal() {
+    return getString(HoodieHBaseIndexConfig.KERBEROS_USER_PRINCIPAL);
+  }
+
+  public String getHBaseIndexRegionserverPrincipal() {
+    return getString(HoodieHBaseIndexConfig.REGIONSERVER_PRINCIPAL);
+  }
+
+  public String getHBaseIndexMasterPrincipal() {
+    return getString(HoodieHBaseIndexConfig.MASTER_PRINCIPAL);
+  }
+
   public int getHBaseIndexDesiredPutsTime() {
     return getInt(HoodieHBaseIndexConfig.DESIRED_PUTS_TIME_IN_SECONDS);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index fc73a0aed7..f841117d5c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -42,7 +42,6 @@ import 
org.apache.hudi.exception.HoodieDependentSystemUnavailableException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -60,10 +59,12 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -72,6 +73,7 @@ import org.joda.time.DateTime;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -150,9 +152,28 @@ public class SparkHoodieHBaseIndex extends 
HoodieIndex<Object, Object> {
     }
     String port = String.valueOf(config.getHbaseZkPort());
     hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
+
     try {
-      return ConnectionFactory.createConnection(hbaseConfig);
-    } catch (IOException e) {
+      String authentication = config.getHBaseIndexSecurityAuthentication();
+      if (authentication.equals("kerberos")) {
+        hbaseConfig.set("hbase.security.authentication", "kerberos");
+        hbaseConfig.set("hadoop.security.authentication", "kerberos");
+        hbaseConfig.set("hbase.security.authorization", "true");
+        hbaseConfig.set("hbase.regionserver.kerberos.principal", 
config.getHBaseIndexRegionserverPrincipal());
+        hbaseConfig.set("hbase.master.kerberos.principal", 
config.getHBaseIndexMasterPrincipal());
+
+        String principal = config.getHBaseIndexKerberosUserPrincipal();
+        String keytab = 
SparkFiles.get(config.getHBaseIndexKerberosUserKeytab());
+
+        UserGroupInformation.setConfiguration(hbaseConfig);
+        UserGroupInformation ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+        return ugi.doAs((PrivilegedExceptionAction<Connection>) () ->
+          (Connection) ConnectionFactory.createConnection(hbaseConfig)
+        );
+      } else {
+        return ConnectionFactory.createConnection(hbaseConfig);
+      }
+    } catch (IOException | InterruptedException e) {
       throw new 
HoodieDependentSystemUnavailableException(HoodieDependentSystemUnavailableException.HBASE,
           quorum + ":" + port, e);
     }

Reply via email to