add column options for FileSink
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/bf05d528 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/bf05d528 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/bf05d528 Branch: refs/heads/master Commit: bf05d5288307c2d2099d3269ef598077638973b2 Parents: 9fb1fee Author: Chul Kang <[email protected]> Authored: Tue Apr 10 17:46:58 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Wed Apr 11 02:57:55 2018 +0900 ---------------------------------------------------------------------- s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bf05d528/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index d2ca8ef..1801fe9 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -81,12 +81,15 @@ class FileSource(conf:TaskConf) extends Source(conf) { import org.apache.s2graph.s2jobs.Schema._ val paths = conf.options("paths").split(",") val format = conf.options.getOrElse("format", DEFAULT_FORMAT) + val columnsOpt = conf.options.get("columns") format match { case "edgeLog" => ss.read.format("com.databricks.spark.csv").option("delimiter", "\t") .schema(BulkLoadSchema).load(paths: _*) case _ => ss.read.format(format).load(paths: _*) + val df = ss.read.format(format).load(paths: _*) + if (columnsOpt.isDefined) df.toDF(columnsOpt.get.split(",").map(_.trim): _*) else df } } }
