Repository: incubator-s2graph
Updated Branches:
  refs/heads/master d9a7860b0 -> 44c319fa7


[S2GRAPH-25]: Support Secure HBase Cluster.

  add kerberos support.

JIRA:
  [S2GRAPH-25] https://issues.apache.org/jira/browse/S2GRAPH-25

Pull Request:
  Closes #3


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/44c319fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/44c319fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/44c319fa

Branch: refs/heads/master
Commit: 44c319fa708a89e694595b49ed8ef334ba631ad5
Parents: d9a7860
Author: DO YUNG YOON <[email protected]>
Authored: Fri Feb 26 11:20:54 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Fri Feb 26 11:20:54 2016 +0900

----------------------------------------------------------------------
 .../core/storage/hbase/AsynchbaseStorage.scala  | 70 ++++++++++++++++++--
 .../storage/hbase/AsynchbaseStorageTest.scala   | 36 ++++++++++
 2 files changed, 100 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/44c319fa/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 9f6e91f..a83aacd 100644
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -12,6 +12,7 @@ import com.kakao.s2graph.core.types._
 import com.kakao.s2graph.core.utils.{Extensions, logger}
 import com.stumbleupon.async.Deferred
 import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
@@ -19,8 +20,8 @@ import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
 import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.hadoop.security.UserGroupInformation
 import org.hbase.async._
-
 import scala.collection.JavaConversions._
 import scala.collection.Seq
 import scala.concurrent.duration.Duration
@@ -36,7 +37,17 @@ object AsynchbaseStorage {
 
 
   def makeClient(config: Config, overrideKv: (String, String)*) = {
-    val asyncConfig: org.hbase.async.Config = new org.hbase.async.Config()
+    val asyncConfig: org.hbase.async.Config =
+      if (config.hasPath("hbase.security.auth.enable") && 
config.getBoolean("hbase.security.auth.enable")) {
+        val krb5Conf = config.getString("java.security.krb5.conf")
+        val jaas = config.getString("java.security.auth.login.config")
+
+        System.setProperty("java.security.krb5.conf", krb5Conf)
+        System.setProperty("java.security.auth.login.config", jaas)
+        new org.hbase.async.Config()
+      } else {
+        new org.hbase.async.Config()
+      }
 
     for (entry <- config.entrySet() if entry.getKey.contains("hbase")) {
       asyncConfig.overrideConfig(entry.getKey, 
entry.getValue.unwrapped().toString)
@@ -836,14 +847,61 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
     }
   }
 
+  private def getSecureClusterAdmin(zkAddr: String) = {
+    val jaas = config.getString("java.security.auth.login.config")
+    val krb5Conf = config.getString("java.security.krb5.conf")
+    val realm = config.getString("realm")
+    val principal = config.getString("principal")
+    val keytab = config.getString("keytab")
 
-  private def getAdmin(zkAddr: String) = {
-    val conf = HBaseConfiguration.create()
-    conf.set("hbase.zookeeper.quorum", zkAddr)
-    val conn = ConnectionFactory.createConnection(conf)
+
+    System.setProperty("java.security.auth.login.config", jaas)
+    System.setProperty("java.security.krb5.conf", krb5Conf)
+    // System.setProperty("sun.security.krb5.debug", "true")
+    // System.setProperty("sun.security.spnego.debug", "true")
+    val conf = new Configuration(true)
+    val hConf = HBaseConfiguration.create(conf)
+
+    hConf.set("hbase.zookeeper.quorum", zkAddr)
+
+    hConf.set("hadoop.security.authentication", "Kerberos")
+    hConf.set("hbase.security.authentication", "Kerberos")
+    hConf.set("hbase.master.kerberos.principal", "hbase/_HOST@" + realm)
+    hConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@" + realm)
+
+    System.out.println("Connecting secure cluster, using keytab\n")
+    UserGroupInformation.setConfiguration(hConf)
+    UserGroupInformation.loginUserFromKeytab(principal, keytab)
+    val currentUser = UserGroupInformation.getCurrentUser()
+    System.out.println("current user : " + currentUser + "\n")
+
+    // get table list
+    val conn = ConnectionFactory.createConnection(hConf)
     conn.getAdmin
   }
 
+  /**
+   * following configuration need to come together to use secured hbase 
cluster.
+   * 1. set hbase.security.auth.enable = true
+   * 2. set file path to jaas file java.security.auth.login.config
+   * 3. set file path to kerberos file java.security.krb5.conf
+   * 4. set realm
+   * 5. set principal
+   * 6. set file path to keytab
+   * @param zkAddr
+   * @return
+   */
+  private def getAdmin(zkAddr: String) = {
+    if (config.hasPath("hbase.security.auth.enable") && 
config.getBoolean("hbase.security.auth.enable")) {
+      getSecureClusterAdmin(zkAddr)
+    } else {
+      val conf = HBaseConfiguration.create()
+      conf.set("hbase.zookeeper.quorum", zkAddr)
+      val conn = ConnectionFactory.createConnection(conf)
+      conn.getAdmin
+    }
+  }
+
   private def enableTable(zkAddr: String, tableName: String) = {
     getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/44c319fa/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala
 
b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala
new file mode 100644
index 0000000..87a0dcb
--- /dev/null
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala
@@ -0,0 +1,36 @@
+package com.kakao.s2graph.core.storage.hbase
+
+import com.typesafe.config.ConfigFactory
+import org.hbase.async.{GetRequest, PutRequest}
+import org.scalatest.{Matchers, FunSuite}
+import scala.collection.JavaConversions._
+
+class AsynchbaseStorageTest extends FunSuite with Matchers {
+
+  /** need secured cluster */
+//  test("test secure cluster connection") {
+//    val config = ConfigFactory.parseMap(
+//      Map(
+//        "hbase.zookeeper.quorum" -> "localhost",
+//        "hbase.security.auth.enable" -> "true",
+//        "hbase.security.authentication" -> "kerberos",
+//        "hbase.kerberos.regionserver.principal" -> "hbase/[email protected]",
+//        "hbase.sasl.clientconfig" -> "Client",
+//        "java.security.krb5.conf" -> "krb5.conf",
+//        "java.security.auth.login.config" -> "async-client.jaas.conf")
+//    )
+//
+//    val client = AsynchbaseStorage.makeClient(config)
+//    val table = "test".getBytes()
+//
+//    val putRequest = new PutRequest(table, "a".getBytes(), "e".getBytes, 
"a".getBytes, "a".getBytes)
+//    val getRequest = new GetRequest(table, "a".getBytes(), "e".getBytes)
+//    val ret = client.put(putRequest).join()
+//    val kvs = client.get(getRequest).join()
+//    for {
+//      kv <- kvs
+//    } {
+//      println(kv.toString)
+//    }
+//  }
+}
\ No newline at end of file

Reply via email to