pass local cache configuration on S2GraphSource.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b4dab3a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b4dab3a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b4dab3a3 Branch: refs/heads/master Commit: b4dab3a328c457d5910ce2ade738aea2f002f30e Parents: 5a862aa Author: DO YUNG YOON <[email protected]> Authored: Tue Apr 17 16:30:24 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Apr 17 16:36:17 2018 +0900 ---------------------------------------------------------------------- .../scala/org/apache/s2graph/s2jobs/JobDescription.scala | 1 + .../s2jobs/loader/LocalBulkLoaderTransformer.scala | 3 ++- .../s2jobs/loader/SparkBulkLoaderTransformer.scala | 9 +++++---- .../main/scala/org/apache/s2graph/s2jobs/task/Sink.scala | 10 +++++++--- .../scala/org/apache/s2graph/s2jobs/task/Source.scala | 7 ++++--- .../main/scala/org/apache/s2graph/s2jobs/task/Task.scala | 7 +++++-- .../scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala | 3 ++- 7 files changed, 26 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala index 3be318d..6abbe86 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala @@ -50,6 +50,7 @@ object JobDescription extends Logger { case "kafka" => new KafkaSource(conf) case "file" => new FileSource(conf) case "hive" => new HiveSource(conf) + case "s2graph" => new S2GraphSource(conf) case _ => throw new IllegalArgumentException(s"unsupported source type : ${conf.`type`}") } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala index ad3483c..7b37509 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala @@ -23,13 +23,14 @@ import com.typesafe.config.Config import org.apache.s2graph.core.{GraphElement, S2Graph} import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer} import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} +import org.apache.s2graph.spark.sql.streaming.S2SinkContext import scala.concurrent.ExecutionContext import scala.reflect.ClassTag class LocalBulkLoaderTransformer(val config: Config, val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] { - val s2: S2Graph = S2GraphHelper.initS2Graph(config) + val s2: S2Graph = S2SinkContext(config).getGraph override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = { val degrees = elements.flatMap { element => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala index 5f8d3e5..95847f9 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala @@ -23,6 +23,7 @@ import com.typesafe.config.Config import org.apache.s2graph.core.GraphElement import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer} import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} +import org.apache.s2graph.spark.sql.streaming.S2SinkContext import org.apache.spark.rdd.RDD import scala.reflect.ClassTag @@ -40,7 +41,7 @@ class SparkBulkLoaderTransformer(val config: Config, override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = { val degrees = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) + val s2 = S2SinkContext(config).getGraph iter.flatMap { element => DegreeKey.fromGraphElement(s2, element).map(_ -> 1L) @@ -48,7 +49,7 @@ class SparkBulkLoaderTransformer(val config: Config, }.reduceByKey(_ + _) degrees.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) + val s2 = S2SinkContext(config).getGraph iter.map { case (degreeKey, count) => writer.writeDegree(s2)(degreeKey, count) @@ -58,7 +59,7 @@ class SparkBulkLoaderTransformer(val config: Config, override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = { val elements = input.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) + val s2 = S2SinkContext(config).getGraph iter.flatMap { line => reader.read(s2)(line) @@ -66,7 +67,7 @@ class SparkBulkLoaderTransformer(val config: Config, } val kvs = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) + val s2 = S2SinkContext(config).getGraph iter.map(writer.write(s2)(_)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 07a626d..0760dc6 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -216,7 +216,9 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con private def writeBatchBulkload(df: DataFrame, runLoadIncrementalHFiles: Boolean = true): Unit = { val options = TaskConf.toGraphFileOptions(conf) - val config = Management.toConfig(options.toConfigParams) + val config = Management.toConfig(TaskConf.parseLocalCacheConfigs(conf) ++ + TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++ options.toConfigParams) + val input = df.rdd val transformer = new SparkBulkLoaderTransformer(config, options) @@ -236,7 +238,9 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con import scala.collection.JavaConversions._ import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ - val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load()) + // TODO: FIX THIS. overwrite local cache config. + val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf) + val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load()) val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise()) val reader = new RowBulkFormatReader @@ -246,7 +250,7 @@ class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con df.foreachPartition { iters => val config = ConfigFactory.parseString(serializedConfig) - val s2Graph = S2GraphHelper.initS2Graph(config) + val s2Graph = S2SinkContext(config).getGraph val responses = iters.grouped(groupedSize).flatMap { rows => val elements = rows.flatMap(row => reader.read(s2Graph)(row)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index dc5c054..f21acd1 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -113,7 +113,8 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) { override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names") override def toDF(ss: SparkSession): DataFrame = { - val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) + val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) ++ + TaskConf.parseLocalCacheConfigs(conf) val config = Management.toConfig(mergedConf) val snapshotPath = conf.options("hbase.rootdir") @@ -126,17 +127,17 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) { if (columnFamily == "v") false else conf.options.getOrElse("build.degree", "false").toBoolean val elementType = conf.options.getOrElse("element.type", "IndexEdge") + val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath, restorePath, tableNames, columnFamily, elementType, batchSize, labelMapping, buildDegree) - implicit val reader = new S2GraphCellReader(elementType) implicit val writer = new RowDataFrameWriter + val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree) val kvs = transformer.transform(cells) - val schema = if (columnFamily == "v") Schema.VertexSchema else Schema.EdgeSchema ss.sqlContext.createDataFrame(kvs, schema) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index 6ba2468..89c8dcd 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -30,14 +30,17 @@ object TaskConf { GraphFileOptions.toOption(args) } - def parseHBaseConfigs(taskConf: TaskConf): Map[String, String] = { + def parseHBaseConfigs(taskConf: TaskConf): Map[String, Any] = { taskConf.options.filterKeys(_.startsWith("hbase.")) } - def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, String] = { + def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, Any] = { taskConf.options.filterKeys(_.startsWith("db.")) } + def parseLocalCacheConfigs(taskConf: TaskConf): Map[String, Any] = { + taskConf.options.filterKeys(_.startsWith("cache.")).mapValues(_.toInt) + } } case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b4dab3a3/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index 0461d1e..765bfb0 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@ -27,6 +27,7 @@ import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} import org.apache.s2graph.core.{Management, S2Graph} import org.apache.s2graph.core.types.HBaseType import org.apache.s2graph.s2jobs.loader.GraphFileOptions +import org.apache.s2graph.spark.sql.streaming.S2SinkContext import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} @@ -65,7 +66,7 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D // initialize spark context. super.beforeAll() - s2 = S2GraphHelper.initS2Graph(s2Config) + s2 = S2SinkContext(s2Config).getGraph initTestDataFile }
