Repository: incubator-s2graph Updated Branches: refs/heads/master da2209dfc -> 7af37dbd3
fix broken test Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/204efaba Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/204efaba Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/204efaba Branch: refs/heads/master Commit: 204efabafcf25411f89cacbd5817f0dc8097b2e3 Parents: a18ec27 Author: daewon <[email protected]> Authored: Thu Apr 26 17:50:17 2018 +0900 Committer: daewon <[email protected]> Committed: Thu Apr 26 17:50:17 2018 +0900 ---------------------------------------------------------------------- build.sbt | 2 +- .../org/apache/s2graph/core/mysqls/Model.scala | 3 +- s2jobs/build.sbt | 4 ++ .../org/apache/s2graph/s2jobs/task/Task.scala | 13 ++++--- .../apache/s2graph/s2jobs/BaseSparkTest.scala | 39 ++++++++++++++++++-- .../s2jobs/loader/GraphFileGeneratorTest.scala | 7 ++++ .../apache/s2graph/s2jobs/task/SinkTest.scala | 16 ++++---- .../apache/s2graph/s2jobs/task/SourceTest.scala | 7 +++- 8 files changed, 72 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index 9e3b84f..8d498c4 100755 --- a/build.sbt +++ b/build.sbt @@ -78,7 +78,7 @@ lazy val s2graph_gremlin = project.dependsOn(s2core) .settings(commonSettings: _*) lazy val root = (project in file(".")) - .aggregate(s2core, s2rest_play) + .aggregate(s2core, s2rest_play, s2jobs) .dependsOn(s2rest_play, s2rest_netty, s2jobs, s2counter_loader, s2graphql) // this enables packaging on the root project .settings(commonSettings: _*) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala index e21072e..939ab93 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala @@ -164,12 +164,13 @@ object Model { def extraOptions(options: Option[String]): Map[String, JsValue] = options match { case None => Map.empty + case Some(v) if v.trim == "" => Map.empty case Some(v) => try { Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty) } catch { case e: Exception => - logger.error(s"An error occurs while parsing the extra label option", e) + logger.error(s"An error occurs while parsing the extra label option: ${v}", e) Map.empty } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/build.sbt ---------------------------------------------------------------------- diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt index e966fa6..c41edd8 100644 --- a/s2jobs/build.sbt +++ b/s2jobs/build.sbt @@ -56,6 +56,10 @@ assemblyShadeRules in assembly := Seq( ShadeRule.rename("com.google.protobuf.**" -> "org.apache.s2graph.shade.google.protobuf.@1").inAll ) +projectDependencies := Seq( + (projectID in "s2core").value exclude("org.mortbay.jetty", "j*") exclude("javax.xml.stream", "s*") exclude("javax.servlet", "s*") exclude("net.jpountz.lz4", "lz4") +) + test in assembly := {} parallelExecution in Test := false http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index 89c8dcd..ea42828 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -42,16 +42,19 @@ object TaskConf { taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt) } } -case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty) + +case class TaskConf(name: String, `type`: String, inputs: Seq[String] = Nil, options: Map[String, String] = Map.empty) trait Task extends Serializable with Logger { - val conf:TaskConf + val conf: TaskConf val LOG_PREFIX = s"[${this.getClass.getSimpleName}]" - def mandatoryOptions:Set[String] - def isValidate:Boolean = mandatoryOptions.subsetOf(conf.options.keySet) + def mandatoryOptions: Set[String] + + def isValidate: Boolean = mandatoryOptions.subsetOf(conf.options.keySet) - require(isValidate, s"""${LOG_PREFIX} not exists mandatory options '${mandatoryOptions.mkString(",")}' + require(isValidate, + s"""${LOG_PREFIX} not exists mandatory options '${mandatoryOptions.mkString(",")}' in task options (${conf.options.keySet.mkString(",")}) """.stripMargin) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index 3fa8cea..9037aba 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@ -20,10 +20,12 @@ package org.apache.s2graph.s2jobs import java.io.{File, PrintWriter} +import java.nio.file.Path import com.holdenkarau.spark.testing.DataFrameSuiteBase +import com.typesafe.config.ConfigFactory import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} +import org.apache.s2graph.core.mysqls.{Label, Service, ServiceColumn} import org.apache.s2graph.core.types.HBaseType import org.apache.s2graph.core.{Management, S2Graph} import org.apache.s2graph.s2jobs.loader.GraphFileOptions @@ -38,7 +40,7 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D tempDir = "/tmp/bulkload_tmp", output = "/tmp/s2graph_bulkload", zkQuorum = "localhost", - dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", + dbUrl = "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL", dbUser = "sa", dbPassword = "sa", dbDriver = "org.h2.Driver", @@ -61,17 +63,46 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D ) override def beforeAll(): Unit = { + // initialize spark context. super.beforeAll() + s2 = S2GraphHelper.getS2Graph(s2Config, true) + + deleteRecursively(new File(options.output)) + deleteRecursively(new File(options.tempDir)) + + def createDummyHbaseTable(hTableName: String): Unit = { + Try { + val hTables = Service.findAll().map(_.hTableName).toSet + if (!hTables(hTableName)) { + s2.management.createService(serviceName = s"_dummy_${hTableName}", cluster = "localhost", + hTableName = hTableName, preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") + } + } + } + + Seq( + "s2graph", + "_test_cases", + "test-htable", + "label_with_ttl", + "label_without_ttl", + "label_with_ttl_copied", + "label_without_ttl_copied", + "s2graph_with_ttl-dev", + "s2graph_without_ttl-dev", + "s2graph_label_test_copied" + ).foreach(createDummyHbaseTable) - s2 = S2GraphHelper.getS2Graph(s2Config) initTestDataFile } override def afterAll(): Unit = { super.afterAll() - if (s2 != null) s2.shutdown() + if (s2 != null) { + s2.shutdown() + } } def initTestDataFile: Unit = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index dfdb595..872b3f4 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -19,6 +19,8 @@ package org.apache.s2graph.s2jobs.loader +import java.io.File + import org.apache.s2graph.core.{PostProcess, S2VertexLike} import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue} import org.apache.s2graph.s2jobs.BaseSparkTest @@ -184,6 +186,11 @@ class GraphFileGeneratorTest extends BaseSparkTest { // this test case expect options.input already exist with valid bulk load format. test("bulk load and fetch vertex: spark mode") { import scala.collection.JavaConverters._ + + + deleteRecursively(new File(options.output)) + deleteRecursively(new File(options.tempDir)) + val serviceColumn = initTestVertexSchema(s2) val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala index 628c5de..2ea47a0 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala @@ -20,6 +20,7 @@ package org.apache.s2graph.s2jobs.task import org.apache.s2graph.core.S2EdgeLike +import org.apache.s2graph.core.mysqls.Label import org.apache.s2graph.s2jobs.BaseSparkTest import org.apache.spark.sql.DataFrame @@ -30,6 +31,7 @@ class SinkTest extends BaseSparkTest { super.beforeAll() initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) } + def toDataFrame(edges: Seq[String]): DataFrame = { import spark.sqlContext.implicits._ val elements = edges.flatMap(s2.elementBuilder.toEdge(_)) @@ -51,8 +53,8 @@ class SinkTest extends BaseSparkTest { val df = toDataFrame(Seq(bulkEdgeString)) val args = Map("writeMethod" -> "bulk") ++ options.toCommand.grouped(2).map { kv => - kv.head -> kv.last - }.toMap + kv.head -> kv.last + }.toMap val conf = TaskConf("test", "sql", Seq("input"), args) @@ -62,7 +64,7 @@ class SinkTest extends BaseSparkTest { val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) s2Edges.foreach { edge => println(edge) } - val filteredEdges = s2Edges.filter{ edge => + val filteredEdges = s2Edges.filter { edge => edge.srcVertex.innerIdVal.toString == "a" && edge.tgtVertex.innerIdVal.toString == "b" && edge.label() == "friends" @@ -85,11 +87,11 @@ class SinkTest extends BaseSparkTest { val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) s2Edges.foreach { edge => println(edge) } - val filteredEdges = s2Edges.filter{ edge => + val filteredEdges = s2Edges.filter { edge => edge.srcVertex.innerIdVal.toString == "b" && - edge.tgtVertex.innerIdVal.toString == "c" && - edge.getTs() == 1416236400000L && - edge.label() == "friends" + edge.tgtVertex.innerIdVal.toString == "c" && + edge.getTs() == 1416236400000L && + edge.label() == "friends" } assert(filteredEdges.size == 1) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala index 9b9a016..f1d1bb1 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala @@ -61,7 +61,9 @@ class SourceTest extends BaseSparkTest { // 2. create snapshot if snapshot is not exist to test TableSnapshotInputFormat. s2.defaultStorage.management.asInstanceOf[AsynchbaseStorageManagement].withAdmin(s2.config) { admin => import scala.collection.JavaConverters._ - if (admin.listSnapshots(snapshotTableName).asScala.toSet(snapshotTableName)) + + val set = admin.listSnapshots(snapshotTableName).asScala.toList.map(_.getName).toSet + if (set(snapshotTableName)) admin.deleteSnapshot(snapshotTableName) admin.snapshot(snapshotTableName, TableName.valueOf(options.tableName)) @@ -99,6 +101,8 @@ class SourceTest extends BaseSparkTest { s"1416236400000\tinsert\tedge\ta\tc\t${label.label}\t{}" ) + s2.defaultStorage.truncateTable(s2Config, label.hTableName) + val (_inputEdges, _outputEdges) = runCheck(bulkEdges, Schema.EdgeSchema, "e", "IndexEdge") val inputEdges = _inputEdges.sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString()) val outputEdges = _outputEdges.sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString()) @@ -111,6 +115,7 @@ class SourceTest extends BaseSparkTest { ignore("S2GraphSource vertex toDF") { val column = initTestVertexSchema(s2) + val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) val bulkVertices = Seq(
