filter OptionKeys on GraphFileOptions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c129ed01 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c129ed01 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c129ed01 Branch: refs/heads/master Commit: c129ed01fed984faebff3eff640d4c1eab249c98 Parents: 1ac70d5 Author: DO YUNG YOON <[email protected]> Authored: Tue Apr 3 15:41:13 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Apr 3 15:41:13 2018 +0900 ---------------------------------------------------------------------- .../scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala | 9 --------- .../org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala | 6 ++++++ .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala | 2 +- .../main/scala/org/apache/s2graph/s2jobs/task/Task.scala | 9 +++++++++ 4 files changed, 16 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index 69b3716..6e68d28 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -24,10 +24,7 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.SKeyValue import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} -import org.apache.s2graph.s2jobs.loader.GraphFileOptions -import org.apache.s2graph.s2jobs.task.TaskConf import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.StructType import play.api.libs.json.{JsObject, Json} @@ -85,12 +82,6 @@ object S2GraphHelper { } } - def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = { - val args = taskConf.options.flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray - - GraphFileOptions.toOption(args) - } - def sparkSqlRowToGraphElement(s2: S2Graph, row: Row, schema: StructType, reservedColumn: Set[String]): Option[GraphElement] = { val timestamp = row.getAs[Long]("timestamp") val operation = Try(row.getAs[String]("operation")).getOrElse("insert") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala index 4bf8379..8aa540c 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala @@ -20,6 +20,12 @@ package org.apache.s2graph.s2jobs.loader object GraphFileOptions { + val OptionKeys = Set( + "--input", "--tempDir", "--output", "--zkQuorum", "--table", "--dbUrl", "--dbUser", "--dbPassword", "--dbDriver", + "--maxHFilePerRegionServer", "--numRegions", "--labelMapping", "--autoEdgeCreate", "--buildDegree", "--incrementalLoad", + "--method" + ) + val parser = new scopt.OptionParser[GraphFileOptions]("run") { opt[String]('i', "input").required().action( (x, c) => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 8c57539..0dce19a 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -210,7 +210,7 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" private def bulkload(df: DataFrame): Unit = { - val options = S2GraphHelper.toGraphFileOptions(conf) + val options = TaskConf.toGraphFileOptions(conf) val config = Management.toConfig(options.toConfigParams) val input = df.rdd http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c129ed01/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index e8f11e3..ddd56bf 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -20,7 +20,16 @@ package org.apache.s2graph.s2jobs.task import org.apache.s2graph.s2jobs.Logger +import org.apache.s2graph.s2jobs.loader.GraphFileOptions +object TaskConf { + def toGraphFileOptions(taskConf: TaskConf): GraphFileOptions = { + val args = taskConf.options.filterKeys(GraphFileOptions.OptionKeys) + .flatMap(kv => Seq(kv._1, kv._2)).toSeq.toArray + + GraphFileOptions.toOption(args) + } +} case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty) trait Task extends Serializable with Logger {
