change S2Graph to use local embedded HMaster when hbase.zookeeper.quorum is localhost.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/1a15af36 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/1a15af36 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/1a15af36 Branch: refs/heads/master Commit: 1a15af36b42909fff1aaf2fbacf23d01edfe6155 Parents: d05d8a4 Author: DO YUNG YOON <[email protected]> Authored: Sat Mar 4 14:42:22 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Mar 4 22:29:22 2017 +0900 ---------------------------------------------------------------------- .gitignore | 2 + s2core/build.sbt | 4 +- .../scala/org/apache/s2graph/core/S2Graph.scala | 14 ++-- .../core/storage/hbase/AsynchbaseStorage.scala | 67 ++++++++++++++++++++ .../core/tinkerpop/S2GraphProvider.scala | 9 --- 5 files changed, 77 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1a15af36/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 1f0f5b2..9f295f2 100644 --- a/.gitignore +++ b/.gitignore @@ -106,3 +106,5 @@ server.pid /dist/ .cache +### Local Embedded HBase Data ### +storage/ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1a15af36/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 9ea975c..9cfc966 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -42,11 +42,11 @@ libraryDependencies ++= Seq( "io.netty" % "netty" % "3.9.4.Final" force(), "org.hbase" % "asynchbase" % "1.7.2" excludeLogging(), "net.bytebuddy" % "byte-buddy" % "1.4.26", - "org.apache.tinkerpop" % "gremlin-core" % tinkerpopVersion, + "org.apache.tinkerpop" % "gremlin-core" % tinkerpopVersion excludeLogging(), "org.apache.tinkerpop" % "gremlin-test" % tinkerpopVersion % "test", "org.scalatest" %% "scalatest" % "2.2.4" % "test", "org.specs2" %% "specs2-core" % specs2Version % "test", - "mysql" % "mysql-connector-java" % "5.1.40" + "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ) libraryDependencies := { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1a15af36/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index bd6c45a..cfd85b1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.core import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} @@ -62,10 +62,8 @@ object S2Graph { "hbase.table.name" -> "s2graph", "hbase.table.compression.algorithm" -> "gz", "phase" -> "dev", -// "db.default.driver" -> "org.h2.Driver", -// "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL", - "db.default.driver" -> "com.mysql.jdbc.Driver", - "db.default.url" -> "jdbc:mysql://default:3306/graph_dev", + "db.default.driver" -> "org.h2.Driver", + "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL", "db.default.password" -> "graph", "db.default.user" -> "graph", "cache.max.size" -> java.lang.Integer.valueOf(10000), @@ -533,8 +531,8 @@ object S2Graph { @Graph.OptIn(Graph.OptIn.SUITE_STRUCTURE_STANDARD) @Graph.OptOuts(value = Array( // passed - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.FeatureSupportTest", method="*", reason="no"), // pass - new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.PropertyTest", method="*", reason="no"), // pass +// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.FeatureSupportTest", method="*", reason="no"), // pass +// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.PropertyTest", method="*", reason="no"), // pass new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VertexPropertyTest", method="*", reason="no"), // pass new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VertexTest", method="*", reason="no"), // pss @@ -547,7 +545,7 @@ object S2Graph { new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method="*", reason="no"), // pass new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexPropertyTest", method="*", reason="no"), // pass -// new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", method="*", reason="no"), // pass + new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", method="*", reason="no"), // pass new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method="*", reason="no"), // pass new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceGraphTest", method="*", reason="no"), // pass http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1a15af36/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index ede1933..dab5aae 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -23,9 +23,11 @@ package org.apache.s2graph.core.storage.hbase import java.util import java.util.Base64 +import java.util.concurrent.{TimeUnit, ExecutorService, Executors} import com.stumbleupon.async.{Callback, Deferred} import com.typesafe.config.Config +import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability} import org.apache.hadoop.hbase.io.compress.Compression.Algorithm @@ -91,6 +93,61 @@ object AsynchbaseStorage { case class ScanWithRange(scan: Scanner, offset: Int, limit: Int) type AsyncRPC = Either[GetRequest, ScanWithRange] + + def initLocalHBase(config: Config, + overwrite: Boolean = true): ExecutorService = { + import java.net.Socket + import java.io.{File, IOException} + + lazy val hbaseExecutor = { + val executor = Executors.newSingleThreadExecutor() + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run(): Unit = { + executor.shutdown() + } + }) + + val hbaseAvailable = try { + val (host, port) = config.getString("hbase.zookeeper.quorum").split(":") match { + case Array(h, p) => (h, p.toInt) + case Array(h) => (h, 2181) + } + + val socket = new Socket(host, port) + socket.close() + true + } catch { + case e: IOException => false + } + + if (!hbaseAvailable) { + // start HBase + executor.submit(new Runnable { + override def run(): Unit = { + val cwd = new File(".").getAbsolutePath + if (overwrite) { + val dataDir = new File(s"$cwd/storage/s2graph") + FileUtils.deleteDirectory(dataDir) + } + + System.setProperty("proc_master", "") + System.setProperty("hbase.log.dir", s"$cwd/storage/s2graph/hbase/") + System.setProperty("hbase.log.file", s"$cwd/storage/s2graph/hbase.log") + System.setProperty("hbase.tmp.dir", s"$cwd/storage/s2graph/hbase/") + System.setProperty("hbase.home.dir", "") + System.setProperty("hbase.id.str", "s2graph") + System.setProperty("hbase.root.logger", "INFO,RFA") + + org.apache.hadoop.hbase.master.HMaster.main(Array[String]("start")) + } + }) + } + + executor + } + hbaseExecutor + } } @@ -100,6 +157,12 @@ class AsynchbaseStorage(override val graph: S2Graph, import Extensions.DeferOps + val hbaseExecutor: ExecutorService = + if (config.getString("hbase.zookeeper.quorum") == "localhost") + AsynchbaseStorage.initLocalHBase(config) + else + null + /** * Asynchbase client setup. * note that we need two client, one for bulk(withWait=false) and another for withWait=true @@ -476,6 +539,10 @@ class AsynchbaseStorage(override val graph: S2Graph, clients.foreach { client => AsynchbaseStorage.shutdown(client) } + if (hbaseExecutor != null) { + hbaseExecutor.shutdown() + hbaseExecutor.awaitTermination(1, TimeUnit.MINUTES) + } } override def createTable(_zkAddr: String, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/1a15af36/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala index 5a997e8..741be06 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala @@ -32,20 +32,11 @@ class S2GraphProvider extends AbstractGraphProvider { override def getBaseConfiguration(s: String, aClass: Class[_], s1: String, graphData: GraphData): util.Map[String, AnyRef] = { val config = ConfigFactory.load() -// val dbUrl = -// if (config.hasPath("db.default.url")) config.getString("db.default.url") -// else "jdbc:mysql://localhost:3306/graph_dev" - - val dbUrl = "jdbc:mysql://localhost:3306/graph_dev" val m = new java.util.HashMap[String, AnyRef]() m.put(Graph.GRAPH, classOf[S2Graph].getName) - m.put("db.default.url", dbUrl) - m.put("db.default.driver", "com.mysql.jdbc.Driver") m } - private val H2Prefix = "jdbc:h2:file:" - override def clear(graph: Graph, configuration: Configuration): Unit = if (graph != null) { val s2Graph = graph.asInstanceOf[S2Graph]
