Repository: incubator-gearpump Updated Branches: refs/heads/master d0997e19e -> 914eb9008
[GEARPUMP-233]Add hbase-example Author: Roshanson <[email protected]> Closes #107 from Roshanson/HBase. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/914eb900 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/914eb900 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/914eb900 Branch: refs/heads/master Commit: 914eb9008882835fbafd126085f3ebe73bd87c9f Parents: d0997e1 Author: Roshanson <[email protected]> Authored: Fri Dec 9 16:37:03 2016 +0800 Committer: manuzhang <[email protected]> Committed: Fri Dec 9 16:37:03 2016 +0800 ---------------------------------------------------------------------- .../streaming/examples/hbase/HBaseConn.scala | 67 ++++++++++++++++++++ .../streaming/examples/hbase/Split.scala | 47 ++++++++++++++ project/BuildExamples.scala | 23 ++++++- 3 files changed, 136 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/914eb900/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala new file mode 100644 index 0000000..df8c2f5 --- /dev/null +++ b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/HBaseConn.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.hbase + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.external.hbase.HBaseSink +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.partitioner.HashPartitioner +import org.apache.gearpump.streaming.sink.DataSinkProcessor +import org.apache.gearpump.streaming.source.DataSourceProcessor +import org.apache.gearpump.util.Graph.Node +import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} +import org.slf4j.Logger + +object HBaseConn extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + val RUN_FOR_EVER = -1 + + override val options: Array[(String, CLIOption[Any])] = Array( + "splitNum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), + "sinkNum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)) + ) + + def application(config: ParseResult, system: ActorSystem): StreamApplication = { + implicit val actorSystem = system + + val splitNum = config.getInt("splitNum") + val sinkNum = config.getInt("sinkNum") + + val split = new Split + val sourceProcessor = DataSourceProcessor(split, splitNum, "Split") + val sink = HBaseSink(UserConfig.empty, "hbase") + val sinkProcessor = DataSinkProcessor(sink, sinkNum) + val partitioner = new HashPartitioner + val computation = sourceProcessor ~ partitioner ~> sinkProcessor + val application = StreamApplication("HBase", Graph(computation), UserConfig.empty) + + application + + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(config, context.system)) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/914eb900/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala new file mode 100644 index 0000000..d16cd87 --- /dev/null +++ b/examples/streaming/hbase/src/main/scala/org/apache/gearpump/streaming/examples/hbase/Split.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.hbase + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.hadoop.hbase.util.Bytes + +class Split extends DataSource { + + private var x = 0 + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = { + + val tuple = (Bytes.toBytes(s"$x"), Bytes.toBytes("group"), + Bytes.toBytes("group:name"), Bytes.toBytes("99")) + x+=1 + + Message(tuple) + } + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/914eb900/project/BuildExamples.scala ---------------------------------------------------------------------- diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala index bac84a8..5507f00 100644 --- a/project/BuildExamples.scala +++ b/project/BuildExamples.scala @@ -37,9 +37,30 @@ object BuildExamples extends sbt.Build { stockcrawler, transport, wordcount, - wordcountJava + wordcountJava, + example_hbase ) + lazy val example_hbase = Project( + id = "gearpump-examples-hbase", + base = file("examples/streaming/hbase"), + settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + Seq( + libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-common" % hadoopVersion + exclude("commons-beanutils", "commons-beanutils-core") + exclude("commons-beanutils", "commons-beanutils") + exclude("asm", "asm") + exclude("org.ow2.asm", "asm") + ), + mainClass in(Compile, packageBin) := + Some("org.apache.gearpump.streaming.examples.hbase.HBaseConn"), + + target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / + CrossVersion.binaryScalaVersion(scalaVersion.value) + ) + ) dependsOn(streaming % "test->test; provided", core % "provided", external_hbase) + lazy val wordcountJava = Project( id = "gearpump-examples-wordcountjava", base = file("examples/streaming/wordcount-java"),
