filter OptionKeys on GraphFileOptions.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c129ed01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c129ed01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c129ed01

Branch: refs/heads/master
Commit: c129ed01fed984faebff3eff640d4c1eab249c98
Parents: 1ac70d5
Author: DO YUNG YOON <[email protected]>
Authored: Tue Apr 3 15:41:13 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Apr 3 15:41:13 2018 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala     | 9 ---------
 .../org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala | 6 ++++++
 .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala    | 2 +-
 .../main/scala/org/apache/s2graph/s2jobs/task/Task.scala    | 9 +++++++++
 4 files changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 69b3716..6e68d28 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,10 +24,7 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
-import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import org.apache.s2graph.s2jobs.task.TaskConf
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.types.StructType
 import play.api.libs.json.{JsObject, Json}
 
@@ -85,12 +82,6 @@ object S2GraphHelper {
     }
   }
 
-  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
-    val args = taskConf.options.flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
-
-    GraphFileOptions.toOption(args)
-  }
-
   def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, 
reservedColumn: Set[String]): Option[GraphElement] = {
     val timestamp = row.getAs[Long]("timestamp")
     val operation = Try(row.getAs[String]("operation")).getOrElse("insert")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index 4bf8379..8aa540c 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -20,6 +20,12 @@
 package org.apache.s2graph.s2jobs.loader
 
 object GraphFileOptions {
+  val OptionKeys = Set(
+    "--input", "--tempDir", "--output", "--zkQuorum", "--table", "--dbUrl", 
"--dbUser", "--dbPassword", "--dbDriver",
+    "--maxHFilePerRegionServer", "--numRegions", "--labelMapping", 
"--autoEdgeCreate", "--buildDegree", "--incrementalLoad",
+    "--method"
+  )
+
   val parser = new scopt.OptionParser[GraphFileOptions]("run") {
 
     opt[String]('i', "input").required().action( (x, c) =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 8c57539..0dce19a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -210,7 +210,7 @@ class S2graphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
   override val FORMAT: String = 
"org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
   private def bulkload(df: DataFrame): Unit = {
-    val options = S2GraphHelper.toGraphFileOptions(conf)
+    val options = TaskConf.toGraphFileOptions(conf)
     val config = Management.toConfig(options.toConfigParams)
     val input = df.rdd
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
index e8f11e3..ddd56bf 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala
@@ -20,7 +20,16 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.s2graph.s2jobs.Logger
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 
+object TaskConf {
+  def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = {
+    val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys)
+      .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray
+
+    GraphFileOptions.toOption(args)
+  }
+}
 case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, 
options:Map[String, String] = Map.empty)
 
 trait Task extends Serializable with Logger {

Reply via email to