Repository: incubator-s2graph Updated Branches: refs/heads/master d9a7860b0 -> 44c319fa7
[S2GRAPH-25]: Support Secure HBase Cluster. add kerberos support. JIRA: [S2GRAPH-25] https://issues.apache.org/jira/browse/S2GRAPH-25 Pull Request: Closes #3 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/44c319fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/44c319fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/44c319fa Branch: refs/heads/master Commit: 44c319fa708a89e694595b49ed8ef334ba631ad5 Parents: d9a7860 Author: DO YUNG YOON <[email protected]> Authored: Fri Feb 26 11:20:54 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Feb 26 11:20:54 2016 +0900 ---------------------------------------------------------------------- .../core/storage/hbase/AsynchbaseStorage.scala | 70 ++++++++++++++++++-- .../storage/hbase/AsynchbaseStorageTest.scala | 36 ++++++++++ 2 files changed, 100 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/44c319fa/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 9f6e91f..a83aacd 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -12,6 +12,7 @@ import com.kakao.s2graph.core.types._ import com.kakao.s2graph.core.utils.{Extensions, logger} import com.stumbleupon.async.Deferred import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability} import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding @@ -19,8 +20,8 @@ import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.hadoop.security.UserGroupInformation import org.hbase.async._ - import scala.collection.JavaConversions._ import scala.collection.Seq import scala.concurrent.duration.Duration @@ -36,7 +37,17 @@ object AsynchbaseStorage { def makeClient(config: Config, overrideKv: (String, String)*) = { - val asyncConfig: org.hbase.async.Config = new org.hbase.async.Config() + val asyncConfig: org.hbase.async.Config = + if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) { + val krb5Conf = config.getString("java.security.krb5.conf") + val jaas = config.getString("java.security.auth.login.config") + + System.setProperty("java.security.krb5.conf", krb5Conf) + System.setProperty("java.security.auth.login.config", jaas) + new org.hbase.async.Config() + } else { + new org.hbase.async.Config() + } for (entry <- config.entrySet() if entry.getKey.contains("hbase")) { asyncConfig.overrideConfig(entry.getKey, entry.getValue.unwrapped().toString) @@ -836,14 +847,61 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, } } + private def getSecureClusterAdmin(zkAddr: String) = { + val jaas = config.getString("java.security.auth.login.config") + val krb5Conf = config.getString("java.security.krb5.conf") + val realm = config.getString("realm") + val principal = config.getString("principal") + val keytab = config.getString("keytab") - private def getAdmin(zkAddr: String) = { - val conf = HBaseConfiguration.create() - conf.set("hbase.zookeeper.quorum", zkAddr) - val conn = ConnectionFactory.createConnection(conf) + + System.setProperty("java.security.auth.login.config", jaas) + System.setProperty("java.security.krb5.conf", krb5Conf) + // System.setProperty("sun.security.krb5.debug", "true") + // System.setProperty("sun.security.spnego.debug", "true") + val conf = new Configuration(true) + val hConf = HBaseConfiguration.create(conf) + + hConf.set("hbase.zookeeper.quorum", zkAddr) + + hConf.set("hadoop.security.authentication", "Kerberos") + hConf.set("hbase.security.authentication", "Kerberos") + hConf.set("hbase.master.kerberos.principal", "hbase/_HOST@" + realm) + hConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@" + realm) + + System.out.println("Connecting secure cluster, using keytab\n") + UserGroupInformation.setConfiguration(hConf) + UserGroupInformation.loginUserFromKeytab(principal, keytab) + val currentUser = UserGroupInformation.getCurrentUser() + System.out.println("current user : " + currentUser + "\n") + + // get table list + val conn = ConnectionFactory.createConnection(hConf) conn.getAdmin } + /** + * following configuration need to come together to use secured hbase cluster. + * 1. set hbase.security.auth.enable = true + * 2. set file path to jaas file java.security.auth.login.config + * 3. set file path to kerberos file java.security.krb5.conf + * 4. set realm + * 5. set principal + * 6. set file path to keytab + * @param zkAddr + * @return + */ + private def getAdmin(zkAddr: String) = { + if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) { + getSecureClusterAdmin(zkAddr) + } else { + val conf = HBaseConfiguration.create() + conf.set("hbase.zookeeper.quorum", zkAddr) + val conn = ConnectionFactory.createConnection(conf) + conn.getAdmin + } + } + private def enableTable(zkAddr: String, tableName: String) = { getAdmin(zkAddr).enableTable(TableName.valueOf(tableName)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/44c319fa/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala new file mode 100644 index 0000000..87a0dcb --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorageTest.scala @@ -0,0 +1,36 @@ +package com.kakao.s2graph.core.storage.hbase + +import com.typesafe.config.ConfigFactory +import org.hbase.async.{GetRequest, PutRequest} +import org.scalatest.{Matchers, FunSuite} +import scala.collection.JavaConversions._ + +class AsynchbaseStorageTest extends FunSuite with Matchers { + + /** need secured cluster */ +// test("test secure cluster connection") { +// val config = ConfigFactory.parseMap( +// Map( +// "hbase.zookeeper.quorum" -> "localhost", +// "hbase.security.auth.enable" -> "true", +// "hbase.security.authentication" -> "kerberos", +// "hbase.kerberos.regionserver.principal" -> "hbase/[email protected]", +// "hbase.sasl.clientconfig" -> "Client", +// "java.security.krb5.conf" -> "krb5.conf", +// "java.security.auth.login.config" -> "async-client.jaas.conf") +// ) +// +// val client = AsynchbaseStorage.makeClient(config) +// val table = "test".getBytes() +// +// val putRequest = new PutRequest(table, "a".getBytes(), "e".getBytes, "a".getBytes, "a".getBytes) +// val getRequest = new GetRequest(table, "a".getBytes(), "e".getBytes) +// val ret = client.put(putRequest).join() +// val kvs = client.get(getRequest).join() +// for { +// kv <- kvs +// } { +// println(kv.toString) +// } +// } +} \ No newline at end of file
