Repository: incubator-s2graph
Updated Branches:
  refs/heads/master da2209dfc -> 7af37dbd3


fix broken test


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/204efaba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/204efaba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/204efaba

Branch: refs/heads/master
Commit: 204efabafcf25411f89cacbd5817f0dc8097b2e3
Parents: a18ec27
Author: daewon <[email protected]>
Authored: Thu Apr 26 17:50:17 2018 +0900
Committer: daewon <[email protected]>
Committed: Thu Apr 26 17:50:17 2018 +0900

----------------------------------------------------------------------
 build.sbt                                       |  2 +-
 .../org/apache/s2graph/core/mysqls/Model.scala  |  3 +-
 s2jobs/build.sbt                                |  4 ++
 .../org/apache/s2graph/s2jobs/task/Task.scala   | 13 ++++---
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   | 39 ++++++++++++++++++--
 .../s2jobs/loader/GraphFileGeneratorTest.scala  |  7 ++++
 .../apache/s2graph/s2jobs/task/SinkTest.scala   | 16 ++++----
 .../apache/s2graph/s2jobs/task/SourceTest.scala |  7 +++-
 8 files changed, 72 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 9e3b84f..8d498c4 100755
--- a/build.sbt
+++ b/build.sbt
@@ -78,7 +78,7 @@ lazy val s2graph_gremlin = project.dependsOn(s2core)
   .settings(commonSettings: _*)
 
 lazy val root = (project in file("."))
-  .aggregate(s2core, s2rest_play)
+  .aggregate(s2core, s2rest_play, s2jobs)
   .dependsOn(s2rest_play, s2rest_netty, s2jobs, s2counter_loader, s2graphql) 
// this enables packaging on the root project
   .settings(commonSettings: _*)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
index e21072e..939ab93 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
@@ -164,12 +164,13 @@ object Model {
 
   def extraOptions(options: Option[String]): Map[String, JsValue] = options 
match {
     case None => Map.empty
+    case Some(v) if v.trim == "" => Map.empty
     case Some(v) =>
       try {
         Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap 
}.getOrElse(Map.empty)
       } catch {
         case e: Exception =>
-          logger.error(s"An error occurs while parsing the extra label 
option", e)
+          logger.error(s"An error occurs while parsing the extra label option: 
${v}", e)
           Map.empty
       }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/build.sbt
----------------------------------------------------------------------
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
index e966fa6..c41edd8 100644
--- a/s2jobs/build.sbt
+++ b/s2jobs/build.sbt
@@ -56,6 +56,10 @@ assemblyShadeRules in assembly := Seq(
   ShadeRule.rename("com.google.protobuf.**" -> 
"org.apache.s2graph.shade.google.protobuf.@1").inAll
 )
 
+projectDependencies := Seq(
+  (projectID in "s2core").value exclude("org.mortbay.jetty", "j*") 
exclude("javax.xml.stream", "s*") exclude("javax.servlet", "s*") 
exclude("net.jpountz.lz4", "lz4")
+)
+
 test in assembly := {}
 
 parallelExecution in Test := false

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index 89c8dcd..ea42828 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -42,16 +42,19 @@ object TaskConf {
     taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt)
   }
 }
-case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, 
options:Map[String, String] = Map.empty)
+
+case class TaskConf(name: String, `type`: String, inputs: Seq[String] = Nil, 
options: Map[String, String] = Map.empty)
 
 trait Task extends Serializable with Logger {
-  val conf:TaskConf
+  val conf: TaskConf
   val LOG_PREFIX = s"[${this.getClass.getSimpleName}]"
 
-  def mandatoryOptions:Set[String]
-  def isValidate:Boolean = mandatoryOptions.subsetOf(conf.options.keySet)
+  def mandatoryOptions: Set[String]
+
+  def isValidate: Boolean = mandatoryOptions.subsetOf(conf.options.keySet)
 
-  require(isValidate, s"""${LOG_PREFIX} not exists mandatory options 
'${mandatoryOptions.mkString(",")}'
+  require(isValidate,
+    s"""${LOG_PREFIX} not exists mandatory options 
'${mandatoryOptions.mkString(",")}'
                           in task options 
(${conf.options.keySet.mkString(",")})
                       """.stripMargin)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 3fa8cea..9037aba 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -20,10 +20,12 @@
 package org.apache.s2graph.s2jobs
 
 import java.io.{File, PrintWriter}
+import java.nio.file.Path
 
 import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import com.typesafe.config.ConfigFactory
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.mysqls.{Label, Service, ServiceColumn}
 import org.apache.s2graph.core.types.HBaseType
 import org.apache.s2graph.core.{Management, S2Graph}
 import org.apache.s2graph.s2jobs.loader.GraphFileOptions
@@ -38,7 +40,7 @@ class BaseSparkTest extends FunSuite with Matchers with 
BeforeAndAfterAll with D
     tempDir = "/tmp/bulkload_tmp",
     output = "/tmp/s2graph_bulkload",
     zkQuorum = "localhost",
-    dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL",
+    dbUrl = "jdbc:h2:file:./var/metastore_jobs;MODE=MYSQL",
     dbUser = "sa",
     dbPassword = "sa",
     dbDriver = "org.h2.Driver",
@@ -61,17 +63,46 @@ class BaseSparkTest extends FunSuite with Matchers with 
BeforeAndAfterAll with D
   )
 
   override def beforeAll(): Unit = {
+
     // initialize spark context.
     super.beforeAll()
+    s2 = S2GraphHelper.getS2Graph(s2Config, true)
+
+    deleteRecursively(new File(options.output))
+    deleteRecursively(new File(options.tempDir))
+
+    def createDummyHbaseTable(hTableName: String): Unit = {
+      Try {
+        val hTables = Service.findAll().map(_.hTableName).toSet
+        if (!hTables(hTableName)) {
+          s2.management.createService(serviceName = s"_dummy_${hTableName}", 
cluster = "localhost",
+            hTableName = hTableName, preSplitSize = -1, hTableTTL = -1, 
compressionAlgorithm = "gz")
+        }
+      }
+    }
+
+    Seq(
+      "s2graph",
+      "_test_cases",
+      "test-htable",
+      "label_with_ttl",
+      "label_without_ttl",
+      "label_with_ttl_copied",
+      "label_without_ttl_copied",
+      "s2graph_with_ttl-dev",
+      "s2graph_without_ttl-dev",
+      "s2graph_label_test_copied"
+    ).foreach(createDummyHbaseTable)
 
-    s2 = S2GraphHelper.getS2Graph(s2Config)
     initTestDataFile
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
 
-    if (s2 != null) s2.shutdown()
+    if (s2 != null) {
+      s2.shutdown()
+    }
   }
 
   def initTestDataFile: Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index dfdb595..872b3f4 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,6 +19,8 @@
 
 package org.apache.s2graph.s2jobs.loader
 
+import java.io.File
+
 import org.apache.s2graph.core.{PostProcess, S2VertexLike}
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
@@ -184,6 +186,11 @@ class GraphFileGeneratorTest extends BaseSparkTest {
   //   this test case expect options.input already exist with valid bulk load 
format.
     test("bulk load and fetch vertex: spark mode") {
       import scala.collection.JavaConverters._
+
+
+      deleteRecursively(new File(options.output))
+      deleteRecursively(new File(options.tempDir))
+
       val serviceColumn = initTestVertexSchema(s2)
 
       val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
index 628c5de..2ea47a0 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SinkTest.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.s2graph.core.S2EdgeLike
+import org.apache.s2graph.core.mysqls.Label
 import org.apache.s2graph.s2jobs.BaseSparkTest
 import org.apache.spark.sql.DataFrame
 
@@ -30,6 +31,7 @@ class SinkTest extends BaseSparkTest {
     super.beforeAll()
     initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm)
   }
+
   def toDataFrame(edges: Seq[String]): DataFrame = {
     import spark.sqlContext.implicits._
     val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
@@ -51,8 +53,8 @@ class SinkTest extends BaseSparkTest {
     val df = toDataFrame(Seq(bulkEdgeString))
     val args = Map("writeMethod" -> "bulk") ++
       options.toCommand.grouped(2).map { kv =>
-      kv.head -> kv.last
-    }.toMap
+        kv.head -> kv.last
+      }.toMap
 
     val conf = TaskConf("test", "sql", Seq("input"), args)
 
@@ -62,7 +64,7 @@ class SinkTest extends BaseSparkTest {
     val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
     s2Edges.foreach { edge => println(edge) }
 
-    val filteredEdges = s2Edges.filter{ edge =>
+    val filteredEdges = s2Edges.filter { edge =>
       edge.srcVertex.innerIdVal.toString == "a" &&
         edge.tgtVertex.innerIdVal.toString == "b" &&
         edge.label() == "friends"
@@ -85,11 +87,11 @@ class SinkTest extends BaseSparkTest {
     val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
     s2Edges.foreach { edge => println(edge) }
 
-    val filteredEdges = s2Edges.filter{ edge =>
+    val filteredEdges = s2Edges.filter { edge =>
       edge.srcVertex.innerIdVal.toString == "b" &&
-      edge.tgtVertex.innerIdVal.toString == "c" &&
-      edge.getTs() == 1416236400000L &&
-      edge.label() == "friends"
+        edge.tgtVertex.innerIdVal.toString == "c" &&
+        edge.getTs() == 1416236400000L &&
+        edge.label() == "friends"
     }
 
     assert(filteredEdges.size == 1)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/204efaba/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
index 9b9a016..f1d1bb1 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
@@ -61,7 +61,9 @@ class SourceTest extends BaseSparkTest {
     // 2. create snapshot if snapshot is not exist to test 
TableSnapshotInputFormat.
     
s2.defaultStorage.management.asInstanceOf[AsynchbaseStorageManagement].withAdmin(s2.config)
 { admin =>
       import scala.collection.JavaConverters._
-      if 
(admin.listSnapshots(snapshotTableName).asScala.toSet(snapshotTableName))
+
+      val set = 
admin.listSnapshots(snapshotTableName).asScala.toList.map(_.getName).toSet
+      if (set(snapshotTableName))
         admin.deleteSnapshot(snapshotTableName)
 
       admin.snapshot(snapshotTableName, TableName.valueOf(options.tableName))
@@ -99,6 +101,8 @@ class SourceTest extends BaseSparkTest {
       s"1416236400000\tinsert\tedge\ta\tc\t${label.label}\t{}"
     )
 
+    s2.defaultStorage.truncateTable(s2Config, label.hTableName)
+
     val (_inputEdges, _outputEdges) = runCheck(bulkEdges, Schema.EdgeSchema, 
"e", "IndexEdge")
     val inputEdges = 
_inputEdges.sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
     val outputEdges = 
_outputEdges.sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString())
@@ -111,6 +115,7 @@ class SourceTest extends BaseSparkTest {
 
   ignore("S2GraphSource vertex toDF") {
     val column = initTestVertexSchema(s2)
+
     val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
 
     val bulkVertices = Seq(

Reply via email to