Repository: incubator-gearpump Updated Branches: refs/heads/master d034fc56d -> 5cce13add
[GEARPUMP-373] Don't create hbase configuration at client side Author: manuzhang <[email protected]> Closes #244 from manuzhang/hbase. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5cce13ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5cce13ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5cce13ad Branch: refs/heads/master Commit: 5cce13addd646d144748f12c48010de0c0b9d02b Parents: d034fc5 Author: manuzhang <[email protected]> Authored: Sun Apr 29 19:11:02 2018 +0800 Committer: manuzhang <[email protected]> Committed: Sun Apr 29 19:11:17 2018 +0800 ---------------------------------------------------------------------- .../gearpump/external/hbase/HBaseSink.scala | 143 +++++++++---------- .../external/hbase/dsl/HBaseDSLSink.scala | 11 +- .../gearpump/external/hbase/HBaseSinkSpec.scala | 41 +++++- 3 files changed, 105 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cce13ad/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index f5e6483..7865c5a 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -17,116 +17,52 @@ */ package org.apache.gearpump.external.hbase -import java.io.{File, ObjectInputStream, ObjectOutputStream} +import java.io.File import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.external.hbase.HBaseSink.HBaseWriterFactory import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.{Constants, FileUtils} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put} +import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table} import org.apache.hadoop.hbase.security.UserProvider import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.hadoop.security.UserGroupInformation -class HBaseSink(userConfig: UserConfig, tableName: String, - val conn: (UserConfig, Configuration) - => Connection, @transient var configuration: Configuration) - extends DataSink { +class HBaseSink private[hbase]( + userConfig: UserConfig, tableName: String, factory: HBaseWriterFactory) extends DataSink { - lazy val connection = conn(userConfig, configuration) - lazy val table = connection.getTable(TableName.valueOf(tableName)) - - override def open(context: TaskContext): Unit = {} - - def this(userConfig: UserConfig, tableName: String, configuration: Configuration) = { - this(userConfig, tableName, HBaseSink.getConnection, configuration) - } + private lazy val hbaseWriter = factory.getHBaseWriter(userConfig, tableName) def this(userConfig: UserConfig, tableName: String) = { - this(userConfig, tableName, HBaseConfiguration.create()) - } - - def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { - insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), - Bytes.toBytes(columnName), Bytes.toBytes(value)) + this(userConfig, tableName, new HBaseWriterFactory) } - def insert( - rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte]) - : Unit = { - val put = new Put(rowKey) - put.addColumn(columnGroup, columnName, value) - table.put(put) - } + override def open(context: TaskContext): Unit = {} - def put(msg: Any): Unit = { - msg match { - case seq: Seq[Any] => - seq.foreach(put) - case tuple: (_, _, _, _) => { - tuple._1 match { - case str: String => { - insert(tuple._1.asInstanceOf[String], tuple._2.asInstanceOf[String], - tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String]) - } - case byteArray: Array[Byte@unchecked] => { - insert(tuple._1.asInstanceOf[Array[Byte]], tuple._2.asInstanceOf[Array[Byte]], - tuple._3.asInstanceOf[Array[Byte]], tuple._4.asInstanceOf[Array[Byte]]) - } - case _ => - // Skip - } - } - } - } override def write(message: Message): Unit = { - put(message.value) - } - - def close(): Unit = { - table.close() - connection.close() + hbaseWriter.put(message.value) } - /** - * Overrides Java's default serialization - * Please do not remove this - */ - private def writeObject(out: ObjectOutputStream): Unit = { - out.defaultWriteObject() - configuration.write(out) + override def close(): Unit = { + hbaseWriter.close() } - /** - * Overrides Java's default deserialization - * Please do not remove this - */ - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject() - val clientConf = new Configuration(false) - clientConf.readFields(in) - configuration = HBaseConfiguration.create(clientConf) - } } object HBaseSink { + val HBASESINK = "hbasesink" val TABLE_NAME = "hbase.table.name" val COLUMN_FAMILY = "hbase.table.column.family" val COLUMN_NAME = "hbase.table.column.name" val HBASE_USER = "hbase.user" - def apply[T](userConfig: UserConfig, tableName: String, configuration: Configuration) - : HBaseSink = { - new HBaseSink(userConfig, tableName, configuration) - } - - def apply[T](userConfig: UserConfig, tableName: String) - : HBaseSink = { + def apply[T](userConfig: UserConfig, tableName: String): HBaseSink = { new HBaseSink(userConfig, tableName) } @@ -159,6 +95,59 @@ object HBaseSink { .create(UserGroupInformation.createRemoteUser(userName.get)) ConnectionFactory.createConnection(configuration, user) } + } + + class HBaseWriterFactory extends java.io.Serializable { + + def getHBaseWriter(userConfig: UserConfig, tableName: String): HBaseWriter = { + new HBaseWriter(userConfig, tableName) + } + } + + class HBaseWriter(connection: Connection, tableName: String) { + + private val table: Table = connection.getTable(TableName.valueOf(tableName)) + + def this(userConfig: UserConfig, tableName: String) = { + this(getConnection(userConfig, HBaseConfiguration.create()), tableName) + } + + def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = { + insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), + Bytes.toBytes(columnName), Bytes.toBytes(value)) + } + + def insert( + rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte]) + : Unit = { + val put = new Put(rowKey) + put.addColumn(columnGroup, columnName, value) + table.put(put) + } + + def put(msg: Any): Unit = { + msg match { + case seq: Seq[Any] => + seq.foreach(put) + case tuple: (_, _, _, _) => + tuple._1 match { + case _: String => + insert(tuple._1.asInstanceOf[String], tuple._2.asInstanceOf[String], + tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String]) + case _: Array[Byte@unchecked] => + insert(tuple._1.asInstanceOf[Array[Byte]], tuple._2.asInstanceOf[Array[Byte]], + tuple._3.asInstanceOf[Array[Byte]], tuple._4.asInstanceOf[Array[Byte]]) + case _ => + // Skip + } + } + } + + def close(): Unit = { + table.close() + connection.close() + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cce13ad/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala index 22efa89..aa0741d 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala @@ -18,7 +18,6 @@ package org.apache.gearpump.external.hbase.dsl import scala.language.implicitConversions -import org.apache.hadoop.conf.Configuration import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.external.hbase.HBaseSink import org.apache.gearpump.streaming.dsl.scalaapi.Stream @@ -26,15 +25,11 @@ import org.apache.gearpump.streaming.dsl.scalaapi.Stream /** Create a HBase DSL Sink */ class HBaseDSLSink[T](stream: Stream[T]) { - def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, description: String) - : Stream[T] = { - stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, description) + def writeToHbase(userConfig: UserConfig, table: String, + parallelism: Int, description: String): Stream[T] = { + stream.sink(HBaseSink[T](userConfig, table), parallelism, userConfig, description) } - def writeToHbase(userConfig: UserConfig, configuration: Configuration, table: String, - parallism: Int, description: String): Stream[T] = { - stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, userConfig, description) - } } object HBaseDSLSink { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5cce13ad/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala index 62da2b1..3dc9f43 100644 --- a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala +++ b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala @@ -17,7 +17,10 @@ */ package org.apache.gearpump.external.hbase +import akka.actor.ActorSystem +import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.external.hbase.HBaseSink.{HBaseWriter, HBaseWriterFactory} import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.task.TaskContext import org.apache.hadoop.conf.Configuration @@ -25,13 +28,42 @@ import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes import org.mockito.Mockito._ +import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - property("HBaseSink should insert a row successfully") { + + property("HBaseSink should invoke HBaseWriter for writing message to HBase") { + + val hbaseWriter = mock[HBaseWriter] + val hbaseWriterFactory = mock[HBaseWriterFactory] + + implicit val system: ActorSystem = MockUtil.system + + val userConfig = UserConfig.empty + val tableName = "hbase" + + when(hbaseWriterFactory.getHBaseWriter(userConfig, tableName)) + .thenReturn(hbaseWriter) + + val hbaseSink = new HBaseSink(userConfig, tableName, hbaseWriterFactory) + + hbaseSink.open(MockUtil.mockTaskContext) + + forAll(Gen.alphaStr) { (value: String) => + val message = Message(value) + hbaseSink.write(message) + verify(hbaseWriter, atLeastOnce()).put(value) + } + + hbaseSink.close() + verify(hbaseWriter).close() + } + + property("HBaseWriter should insert a row successfully") { val table = mock[Table] val config = mock[Configuration] @@ -54,10 +86,9 @@ class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock val put = new Put(Bytes.toBytes(row)) put.addColumn(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) - val hbaseSink = new HBaseSink(userConfig, tableName, (userConfig, config) - => connection, config) - hbaseSink.open(taskContext) - hbaseSink.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), + + val hbaseWriter = new HBaseWriter(connection, tableName) + hbaseWriter.insert(Bytes.toBytes(row), Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value)) verify(table).put(MockUtil.argMatch[Put](_.getRow sameElements Bytes.toBytes(row)))
