Repository: incubator-gearpump
Updated Branches:
  refs/heads/master d0997e19e -> 914eb9008


[GEARPUMP-233]Add hbase-example

Author: Roshanson <[email protected]>

Closes #107 from Roshanson/HBase.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/914eb900
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/914eb900
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/914eb900

Branch: refs/heads/master
Commit: 914eb9008882835fbafd126085f3ebe73bd87c9f
Parents: d0997e1
Author: Roshanson <[email protected]>
Authored: Fri Dec 9 16:37:03 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Fri Dec 9 16:37:03 2016 +0800

----------------------------------------------------------------------
 .../streaming/examples/hbase/HBaseConn.scala    | 67 ++++++++++++++++++++
 .../streaming/examples/hbase/Split.scala        | 47 ++++++++++++++
 project/BuildExamples.scala                     | 23 ++++++-
 3 files changed, 136 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/914eb900/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala
 
b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala
new file mode 100644
index 0000000..df8c2f5
--- /dev/null
+++ 
b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.hbase
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.external.hbase.HBaseSink
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.DataSourceProcessor
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+import org.slf4j.Logger
+
+object HBaseConn extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  val RUN_FOR_EVER = -1
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "splitNum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
+    "sinkNum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1))
+  )
+
+  def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
+    implicit val actorSystem = system
+
+    val splitNum = config.getInt("splitNum")
+    val sinkNum = config.getInt("sinkNum")
+
+    val split = new Split
+    val sourceProcessor = DataSourceProcessor(split, splitNum, "Split")
+    val sink = HBaseSink(UserConfig.empty, "hbase")
+    val sinkProcessor = DataSinkProcessor(sink, sinkNum)
+    val partitioner = new HashPartitioner
+    val computation = sourceProcessor ~ partitioner ~> sinkProcessor
+    val application = StreamApplication("HBase", Graph(computation), 
UserConfig.empty)
+
+    application
+
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(config, context.system))
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/914eb900/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala
 
b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala
new file mode 100644
index 0000000..d16cd87
--- /dev/null
+++ 
b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.hbase
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.hadoop.hbase.util.Bytes
+
+class Split extends DataSource {
+
+  private var x = 0
+
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+  override def read(): Message = {
+
+    val tuple = (Bytes.toBytes(s"$x"), Bytes.toBytes("group"),
+      Bytes.toBytes("group:name"), Bytes.toBytes("99"))
+    x+=1
+
+    Message(tuple)
+  }
+
+  override def close(): Unit = {}
+
+  override def getWatermark: Instant = Instant.now()
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/914eb900/project/BuildExamples.scala
----------------------------------------------------------------------
diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala
index bac84a8..5507f00 100644
--- a/project/BuildExamples.scala
+++ b/project/BuildExamples.scala
@@ -37,9 +37,30 @@ object BuildExamples extends sbt.Build {
     stockcrawler,
     transport,
     wordcount,
-    wordcountJava
+    wordcountJava,
+    example_hbase
   )
 
+  lazy val example_hbase = Project(
+    id = "gearpump-examples-hbase",
+    base = file("examples/streaming/hbase"),
+    settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "org.apache.hadoop" % "hadoop-common" % hadoopVersion
+            exclude("commons-beanutils", "commons-beanutils-core")
+            exclude("commons-beanutils", "commons-beanutils")
+            exclude("asm", "asm")
+            exclude("org.ow2.asm", "asm")
+        ),
+        mainClass in(Compile, packageBin) :=
+          Some("org.apache.gearpump.streaming.examples.hbase.HBaseConn"),
+
+        target in assembly := baseDirectory.value.getParentFile.getParentFile 
/ "target" /
+          CrossVersion.binaryScalaVersion(scalaVersion.value)
+      )
+  ) dependsOn(streaming % "test->test; provided", core % "provided", 
external_hbase)
+
   lazy val wordcountJava = Project(
     id = "gearpump-examples-wordcountjava",
     base = file("examples/streaming/wordcount-java"),

Reply via email to