Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 6f153e806 -> 2fe4abd72


GEARPUMP-164 configurable hbase user

Simple change that allows configuring the user for connection to HBase.
Tested on YARN deployment.

(PR replacing messy https://github.com/apache/incubator-gearpump/pull/44)

Author: karol brejna <[email protected]>

Closes #49 from karol-brejna-i/GEARPUMP-164-hbase-user.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/2fe4abd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/2fe4abd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/2fe4abd7

Branch: refs/heads/master
Commit: 2fe4abd72793b6c8af77893f47e728e92a6e04a5
Parents: 6f153e8
Author: karol brejna <[email protected]>
Authored: Thu Jun 23 16:40:46 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Thu Jun 23 16:40:46 2016 +0800

----------------------------------------------------------------------
 external/hbase/README.md                             | 10 +++++++++-
 .../apache/gearpump/external/hbase/HBaseSink.scala   | 15 +++++++++++++--
 2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2fe4abd7/external/hbase/README.md
----------------------------------------------------------------------
diff --git a/external/hbase/README.md b/external/hbase/README.md
index 0e55a5d..9d8e18a 100644
--- a/external/hbase/README.md
+++ b/external/hbase/README.md
@@ -26,8 +26,16 @@ The HBase cluster should run on where Gearpump is deployed.
 Suppose HBase is installed at ```/usr/lib/hbase``` on every node and you 
already have your application built into a jar file. 
 Then before submitting the application, you need to add HBase lib folder and 
conf folder into ```gearpump.executor.extraClasspath``` in 
```conf/gear.conf```, for example 
```/usr/lib/hbase/lib/*:/usr/lib/hbase/conf```. 
 Please note only client side's configuration change is needed. After that, you 
are able to submit the application.
+
+
+## If you need to supply the HBase user for the connection
+There are HBase configurations that have authorization configured (some users 
are allowed to read/write to selected namespaces/tables, some are not).
+
+In this cases you may need to configure the user that connects to HBase.
+
+When creating HBase Sink you can pass UserConfig object. If the object 
contains "hbase.user" property, the value will be used as the user name for 
HBase connection.
  
-## Working with Secured HBASE
+## Working with Kerberized HBase
 
 When the remote HBase is security enabled, a kerberos keytab and the 
corresponding principal name need to be
 provided for the gearpump-hbase connector. Specifically, the UserConfig object 
passed into the HBaseSink should contain

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2fe4abd7/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
index f3c5209..e4d5633 100644
--- 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
+++ 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
+import org.apache.hadoop.hbase.security.{User, UserProvider}
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.gearpump.Message
@@ -82,8 +83,8 @@ class HBaseSink(
   }
 
   def close(): Unit = {
-    connection.close()
     table.close()
+    connection.close()
   }
 
   /**
@@ -112,6 +113,7 @@ object HBaseSink {
   val TABLE_NAME = "hbase.table.name"
   val COLUMN_FAMILY = "hbase.table.column.family"
   val COLUMN_NAME = "hbase.table.column.name"
+  val HBASE_USER = "hbase.user"
 
   def apply[T](userconfig: UserConfig, tableName: String): HBaseSink = {
     new HBaseSink(userconfig, tableName)
@@ -142,6 +144,15 @@ object HBaseSink {
       UserGroupInformation.loginUserFromKeytab(principal.get, 
keytabFile.getAbsolutePath)
       keytabFile.delete()
     }
-    ConnectionFactory.createConnection(configuration)
+
+    val userName = userConfig.getString(HBASE_USER)
+    if (userName.isEmpty) {
+      ConnectionFactory.createConnection(configuration)
+    } else {
+      val user = UserProvider.instantiate(configuration)
+        .create(UserGroupInformation.createRemoteUser(userName.get))
+      ConnectionFactory.createConnection(configuration, user)
+    }
+
   }
 }

Reply via email to