Repository: incubator-gearpump
Updated Branches:
  refs/heads/master d034fc56d -> 5cce13add


[GEARPUMP-373] Don't create hbase configuration at client side

Author: manuzhang <[email protected]>

Closes #244 from manuzhang/hbase.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5cce13ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5cce13ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5cce13ad

Branch: refs/heads/master
Commit: 5cce13addd646d144748f12c48010de0c0b9d02b
Parents: d034fc5
Author: manuzhang <[email protected]>
Authored: Sun Apr 29 19:11:02 2018 +0800
Committer: manuzhang <[email protected]>
Committed: Sun Apr 29 19:11:17 2018 +0800

----------------------------------------------------------------------
 .../gearpump/external/hbase/HBaseSink.scala     | 143 +++++++++----------
 .../external/hbase/dsl/HBaseDSLSink.scala       |  11 +-
 .../gearpump/external/hbase/HBaseSinkSpec.scala |  41 +++++-
 3 files changed, 105 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cce13ad/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
index f5e6483..7865c5a 100644
--- 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
+++ 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
@@ -17,116 +17,52 @@
  */
 package org.apache.gearpump.external.hbase
 
-import java.io.{File, ObjectInputStream, ObjectOutputStream}
+import java.io.File
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.external.hbase.HBaseSink.HBaseWriterFactory
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.{Constants, FileUtils}
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
+import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, 
Table}
 import org.apache.hadoop.hbase.security.UserProvider
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
 import org.apache.hadoop.security.UserGroupInformation
 
-class HBaseSink(userConfig: UserConfig, tableName: String,
-    val conn: (UserConfig, Configuration)
-    => Connection, @transient var configuration: Configuration)
-  extends DataSink {
+class HBaseSink private[hbase](
+    userConfig: UserConfig, tableName: String, factory: HBaseWriterFactory) 
extends DataSink {
 
-  lazy val connection = conn(userConfig, configuration)
-  lazy val table = connection.getTable(TableName.valueOf(tableName))
-
-  override def open(context: TaskContext): Unit = {}
-
-  def this(userConfig: UserConfig, tableName: String, configuration: 
Configuration) = {
-    this(userConfig, tableName, HBaseSink.getConnection, configuration)
-  }
+  private lazy val hbaseWriter = factory.getHBaseWriter(userConfig, tableName)
 
   def this(userConfig: UserConfig, tableName: String) = {
-    this(userConfig, tableName, HBaseConfiguration.create())
-  }
-
-  def insert(rowKey: String, columnGroup: String, columnName: String, value: 
String): Unit = {
-    insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup),
-      Bytes.toBytes(columnName), Bytes.toBytes(value))
+    this(userConfig, tableName, new HBaseWriterFactory)
   }
 
-  def insert(
-      rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], 
value: Array[Byte])
-  : Unit = {
-    val put = new Put(rowKey)
-    put.addColumn(columnGroup, columnName, value)
-    table.put(put)
-  }
+  override def open(context: TaskContext): Unit = {}
 
-  def put(msg: Any): Unit = {
-    msg match {
-      case seq: Seq[Any] =>
-        seq.foreach(put)
-      case tuple: (_, _, _, _) => {
-        tuple._1 match {
-          case str: String => {
-            insert(tuple._1.asInstanceOf[String], 
tuple._2.asInstanceOf[String],
-              tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String])
-          }
-          case byteArray: Array[Byte@unchecked] => {
-            insert(tuple._1.asInstanceOf[Array[Byte]], 
tuple._2.asInstanceOf[Array[Byte]],
-              tuple._3.asInstanceOf[Array[Byte]], 
tuple._4.asInstanceOf[Array[Byte]])
-          }
-          case _ =>
-          // Skip
-        }
-      }
-    }
-  }
 
   override def write(message: Message): Unit = {
-    put(message.value)
-  }
-
-  def close(): Unit = {
-    table.close()
-    connection.close()
+    hbaseWriter.put(message.value)
   }
 
-  /**
-   * Overrides Java's default serialization
-   * Please do not remove this
-   */
-  private def writeObject(out: ObjectOutputStream): Unit = {
-    out.defaultWriteObject()
-    configuration.write(out)
+  override def close(): Unit = {
+    hbaseWriter.close()
   }
 
-  /**
-   * Overrides Java's default deserialization
-   * Please do not remove this
-   */
-  private def readObject(in: ObjectInputStream): Unit = {
-    in.defaultReadObject()
-    val clientConf = new Configuration(false)
-    clientConf.readFields(in)
-    configuration = HBaseConfiguration.create(clientConf)
-  }
 }
 
 object HBaseSink {
+
   val HBASESINK = "hbasesink"
   val TABLE_NAME = "hbase.table.name"
   val COLUMN_FAMILY = "hbase.table.column.family"
   val COLUMN_NAME = "hbase.table.column.name"
   val HBASE_USER = "hbase.user"
 
-  def apply[T](userConfig: UserConfig, tableName: String, configuration: 
Configuration)
-  : HBaseSink = {
-    new HBaseSink(userConfig, tableName, configuration)
-  }
-
-  def apply[T](userConfig: UserConfig, tableName: String)
-  : HBaseSink = {
+  def apply[T](userConfig: UserConfig, tableName: String): HBaseSink = {
     new HBaseSink(userConfig, tableName)
   }
 
@@ -159,6 +95,59 @@ object HBaseSink {
         .create(UserGroupInformation.createRemoteUser(userName.get))
       ConnectionFactory.createConnection(configuration, user)
     }
+  }
+
+  class HBaseWriterFactory extends java.io.Serializable {
+
+    def getHBaseWriter(userConfig: UserConfig, tableName: String): HBaseWriter 
= {
+      new HBaseWriter(userConfig, tableName)
+    }
+  }
+
+  class HBaseWriter(connection: Connection, tableName: String) {
+
+    private val table: Table = 
connection.getTable(TableName.valueOf(tableName))
+
+    def this(userConfig: UserConfig, tableName: String) = {
+      this(getConnection(userConfig, HBaseConfiguration.create()), tableName)
+    }
+
+    def insert(rowKey: String, columnGroup: String, columnName: String, value: 
String): Unit = {
+      insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup),
+        Bytes.toBytes(columnName), Bytes.toBytes(value))
+    }
+
+    def insert(
+        rowKey: Array[Byte], columnGroup: Array[Byte], columnName: 
Array[Byte], value: Array[Byte])
+    : Unit = {
+      val put = new Put(rowKey)
+      put.addColumn(columnGroup, columnName, value)
+      table.put(put)
+    }
+
+    def put(msg: Any): Unit = {
+      msg match {
+        case seq: Seq[Any] =>
+          seq.foreach(put)
+        case tuple: (_, _, _, _) =>
+          tuple._1 match {
+            case _: String =>
+              insert(tuple._1.asInstanceOf[String], 
tuple._2.asInstanceOf[String],
+                tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String])
+            case _: Array[Byte@unchecked] =>
+              insert(tuple._1.asInstanceOf[Array[Byte]], 
tuple._2.asInstanceOf[Array[Byte]],
+                tuple._3.asInstanceOf[Array[Byte]], 
tuple._4.asInstanceOf[Array[Byte]])
+            case _ =>
+            // Skip
+          }
+      }
+    }
+
+    def close(): Unit = {
+      table.close()
+      connection.close()
+    }
+
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cce13ad/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
index 22efa89..aa0741d 100644
--- 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
+++ 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
@@ -18,7 +18,6 @@
 package org.apache.gearpump.external.hbase.dsl
 
 import scala.language.implicitConversions
-import org.apache.hadoop.conf.Configuration
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.external.hbase.HBaseSink
 import org.apache.gearpump.streaming.dsl.scalaapi.Stream
@@ -26,15 +25,11 @@ import org.apache.gearpump.streaming.dsl.scalaapi.Stream
 /** Create a HBase DSL Sink */
 class HBaseDSLSink[T](stream: Stream[T]) {
 
-  def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, 
description: String)
-    : Stream[T] = {
-    stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, 
description)
+  def writeToHbase(userConfig: UserConfig, table: String,
+      parallelism: Int, description: String): Stream[T] = {
+    stream.sink(HBaseSink[T](userConfig, table), parallelism, userConfig, 
description)
   }
 
-  def writeToHbase(userConfig: UserConfig, configuration: Configuration, 
table: String,
-      parallism: Int, description: String): Stream[T] = {
-    stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, 
userConfig, description)
-  }
 }
 
 object HBaseDSLSink {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cce13ad/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
 
b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
index 62da2b1..3dc9f43 100644
--- 
a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
+++ 
b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
@@ -17,7 +17,10 @@
  */
 package org.apache.gearpump.external.hbase
 
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.external.hbase.HBaseSink.{HBaseWriter, 
HBaseWriterFactory}
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.hadoop.conf.Configuration
@@ -25,13 +28,42 @@ import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.client._
 import org.apache.hadoop.hbase.util.Bytes
 import org.mockito.Mockito._
+import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
 class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
 
-  property("HBaseSink should insert a row successfully") {
+
+  property("HBaseSink should invoke HBaseWriter for writing message to HBase") 
{
+
+    val hbaseWriter = mock[HBaseWriter]
+    val hbaseWriterFactory = mock[HBaseWriterFactory]
+
+    implicit val system: ActorSystem = MockUtil.system
+
+    val userConfig = UserConfig.empty
+    val tableName = "hbase"
+
+    when(hbaseWriterFactory.getHBaseWriter(userConfig, tableName))
+      .thenReturn(hbaseWriter)
+
+    val hbaseSink = new HBaseSink(userConfig, tableName, hbaseWriterFactory)
+
+    hbaseSink.open(MockUtil.mockTaskContext)
+
+    forAll(Gen.alphaStr) { (value: String) =>
+      val message = Message(value)
+      hbaseSink.write(message)
+      verify(hbaseWriter, atLeastOnce()).put(value)
+    }
+
+    hbaseSink.close()
+    verify(hbaseWriter).close()
+  }
+
+  property("HBaseWriter should insert a row successfully") {
 
     val table = mock[Table]
     val config = mock[Configuration]
@@ -54,10 +86,9 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks 
with Matchers with Mock
 
     val put = new Put(Bytes.toBytes(row))
     put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), 
Bytes.toBytes(value))
-    val hbaseSink = new HBaseSink(userConfig, tableName, (userConfig, config)
-    => connection, config)
-    hbaseSink.open(taskContext)
-    hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), 
Bytes.toBytes(name),
+
+    val hbaseWriter = new HBaseWriter(connection, tableName)
+    hbaseWriter.insert(Bytes.toBytes(row), Bytes.toBytes(group), 
Bytes.toBytes(name),
       Bytes.toBytes(value))
 
     verify(table).put(MockUtil.argMatch[Put](_.getRow sameElements 
Bytes.toBytes(row)))

Reply via email to