simplify Transformer interface.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ae19dc11 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ae19dc11 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ae19dc11 Branch: refs/heads/master Commit: ae19dc11c039c92deb92ea42fdb48b3bbe7bd6dd Parents: b5535eb Author: DO YUNG YOON <[email protected]> Authored: Mon Apr 2 17:38:09 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Apr 2 17:38:09 2018 +0900 ---------------------------------------------------------------------- .../s2graph/s2jobs/loader/HFileGenerator.scala | 6 ++ .../s2jobs/loader/HFileMRGenerator.scala | 6 ++ .../loader/LocalBulkLoaderTransformer.scala | 28 +++----- .../loader/SparkBulkLoaderTransformer.scala | 49 ++++++------- .../SparkGraphElementLoaderTransformer.scala | 75 -------------------- .../s2jobs/serde/GraphElementWritable.scala | 4 ++ .../s2graph/s2jobs/serde/Transformer.scala | 19 ++--- .../s2jobs/serde/writer/KeyValueWriter.scala | 6 +- .../org/apache/s2graph/s2jobs/task/Sink.scala | 55 ++++++++------ .../s2jobs/loader/GraphFileGeneratorTest.scala | 16 ++++- 10 files changed, 107 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index b4ac51f..431631b 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName} import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement +import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -113,6 +115,10 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { rdd: RDD[String], _options: GraphFileOptions): Unit = { val transformer = new SparkBulkLoaderTransformer(config, _options) + + implicit val reader = new TsvBulkFormatReader + implicit val writer = new KeyValueWriter + val kvs = transformer.transform(rdd).flatMap(kvs => kvs) HFileGenerator.generateHFile(sc, config, kvs, _options) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala index 3502bee..87968f0 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala @@ -34,6 +34,8 @@ import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat} import org.apache.hadoop.mapreduce.{Job, Mapper} +import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -105,6 +107,10 @@ object HFileMRGenerator extends RawFileGenerator[String, KeyValue] { input: RDD[String], options: GraphFileOptions): RDD[KeyValue] = { val transformer = new SparkBulkLoaderTransformer(s2Config, options) + + implicit val reader = new TsvBulkFormatReader + implicit val writer = new KeyValueWriter + transformer.transform(input).flatMap(kvs => kvs) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala index 7d405a6..ad3483c 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala @@ -20,41 +20,31 @@ package org.apache.s2graph.s2jobs.loader import com.typesafe.config.Config -import org.apache.hadoop.hbase.KeyValue import org.apache.s2graph.core.{GraphElement, S2Graph} -import org.apache.s2graph.s2jobs.serde.Transformer -import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader -import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer} import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} import scala.concurrent.ExecutionContext +import scala.reflect.ClassTag class LocalBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] { + val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] { val s2: S2Graph = S2GraphHelper.initS2Graph(config) - override val reader = new TsvBulkFormatReader - override val writer = new KeyValueWriter - - override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_)) - - override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_)) - - override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = { + override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = { val degrees = elements.flatMap { element => DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) }.groupBy(_._1).mapValues(_.map(_._2).sum) degrees.toSeq.map { case (degreeKey, count) => - DegreeKey.toKeyValue(s2, degreeKey, count) + writer.writeDegree(s2)(degreeKey, count) } } - override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = { - val elements = read(input) - val kvs = write(elements) - - val degrees = if (options.buildDegree) buildDegrees(elements) else Nil + override def transform[S: ClassTag, T: ClassTag](input: Seq[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): Seq[T] = { + val elements = input.flatMap(reader.read(s2)(_)) + val kvs = elements.map(writer.write(s2)(_)) + val degrees = if (options.buildDegree) buildDegrees[T](elements) else Nil kvs ++ degrees } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala index cd991e1..03d9784 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala @@ -20,35 +20,17 @@ package org.apache.s2graph.s2jobs.loader import com.typesafe.config.Config -import org.apache.hadoop.hbase.{KeyValue => HKeyValue} import org.apache.s2graph.core.GraphElement -import org.apache.s2graph.s2jobs.serde.Transformer -import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader -import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, GraphElementWritable, Transformer} import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} import org.apache.spark.rdd.RDD -class SparkBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] { - val reader = new TsvBulkFormatReader - - val writer = new KeyValueWriter - - override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) +import scala.reflect.ClassTag - iter.flatMap { line => - reader.read(s2)(line) - } - } - - override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.map(writer.write(s2)(_)) - } +class SparkBulkLoaderTransformer(val config: Config, + val options: GraphFileOptions) extends Transformer[RDD] { - override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = { + override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = { val degrees = elements.mapPartitions { iter => val s2 = S2GraphHelper.initS2Graph(config) @@ -61,16 +43,27 @@ class SparkBulkLoaderTransformer(val config: Config, val s2 = S2GraphHelper.initS2Graph(config) iter.map { case (degreeKey, count) => - DegreeKey.toKeyValue(s2, degreeKey, count) + writer.writeDegree(s2)(degreeKey, count) } } } - override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = { - val elements = read(input) - val kvs = write(elements) + override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = { + val elements = input.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap { line => + reader.read(s2)(line) + } + } + + val kvs = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map(writer.write(s2)(_)) + } if (options.buildDegree) kvs ++ buildDegrees(elements) - kvs + else kvs } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala deleted file mode 100644 index fcf8d4c..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.s2jobs.loader - -import com.typesafe.config.Config -import org.apache.hadoop.hbase.{KeyValue => HKeyValue} -import org.apache.s2graph.core.GraphElement -import org.apache.s2graph.s2jobs.serde.Transformer -import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader} -import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter -import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row - -class SparkGraphElementLoaderTransformer(val config: Config, - val options: GraphFileOptions) extends Transformer[Row, Seq[HKeyValue], org.apache.spark.rdd.RDD] { - val reader = new RowBulkFormatReader - - val writer = new KeyValueWriter - - override def read(input: RDD[Row]): RDD[GraphElement] = input.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.flatMap(reader.read(s2)(_)) - } - - override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.map(writer.write(s2)(_)) - } - - override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = { - val degrees = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.flatMap { element => - DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) - } - }.reduceByKey(_ + _) - - degrees.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.map { case (degreeKey, count) => - DegreeKey.toKeyValue(s2, degreeKey, count) - } - } - } - - override def transform(input: RDD[Row]): RDD[Seq[HKeyValue]] = { - val elements = read(input) - val kvs = write(elements) - - if (options.buildDegree) kvs ++ buildDegrees(elements) - kvs - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala index ae082d8..f71a9e8 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala @@ -20,7 +20,11 @@ package org.apache.s2graph.s2jobs.serde import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.DegreeKey trait GraphElementWritable[T] extends Serializable { + def write(s2: S2Graph)(element: GraphElement): T + + def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): T } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala index 3902c63..ef1bd29 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala @@ -23,28 +23,21 @@ import com.typesafe.config.Config import org.apache.s2graph.core.GraphElement import org.apache.s2graph.s2jobs.loader.GraphFileOptions +import scala.reflect.ClassTag + /** * Define serialize/deserialize. * Source -> GraphElement * GraphElement -> Target * - * @tparam S : Source class. ex) String, RDF.Statement, ... - * @tparam T : Target class. ex) KeyValue, Array[Byte], String, ... * @tparam M : Container type. ex) RDD, Seq, List, ... */ -trait Transformer[S, T, M[_]] extends Serializable { +trait Transformer[M[_]] extends Serializable { val config: Config val options: GraphFileOptions - val reader: GraphElementReadable[S] - - val writer: GraphElementWritable[T] - - def read(input: M[S]): M[GraphElement] - - def write(elements: M[GraphElement]): M[T] - - def buildDegrees(elements: M[GraphElement]): M[T] + def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: GraphElementWritable[T]): M[T] - def transform(input: M[S]): M[T] + def transform[S: ClassTag, T: ClassTag](input: M[S]) + (implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): M[T] } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala index 02034af..cc1f801 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.s2jobs.serde.writer import org.apache.hadoop.hbase.KeyValue import org.apache.s2graph.core.{GraphElement, S2Graph} -import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} import org.apache.s2graph.s2jobs.serde.GraphElementWritable class KeyValueWriter(autoEdgeCreate: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] { @@ -30,4 +30,8 @@ class KeyValueWriter(autoEdgeCreate: Boolean = false) extends GraphElementWritab new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value) } } + + override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Seq[KeyValue] = { + DegreeKey.toKeyValue(s2, degreeKey, count) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index b7a91d9..bc67822 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -20,29 +20,30 @@ package org.apache.s2graph.s2jobs.task import com.typesafe.config.Config -import org.apache.s2graph.core.{GraphElement, Management} +import org.apache.s2graph.core.Management import org.apache.s2graph.s2jobs.S2GraphHelper -import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer, SparkGraphElementLoaderTransformer} -import org.apache.s2graph.s2jobs.serde.GraphElementReadable -import org.apache.spark.rdd.RDD +import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer} +import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.elasticsearch.spark.sql.EsSparkSQL /** * Sink + * * @param queryName * @param conf */ -abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { +abstract class Sink(queryName: String, override val conf: TaskConf) extends Task { val DEFAULT_CHECKPOINT_LOCATION = s"/tmp/streamingjob/${queryName}/${conf.name}" val DEFAULT_TRIGGER_INTERVAL = "10 seconds" - val FORMAT:String + val FORMAT: String - def preprocess(df:DataFrame):DataFrame = df + def preprocess(df: DataFrame): DataFrame = df - def write(inputDF: DataFrame):Unit = { + def write(inputDF: DataFrame): Unit = { val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) if (inputDF.isStreaming) writeStream(df.writeStream) @@ -56,7 +57,7 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { case "update" => OutputMode.Update() case "complete" => OutputMode.Complete() case _ => logger.warn(s"${LOG_PREFIX} unsupported output mode. use default output mode 'append'") - OutputMode.Append() + OutputMode.Append() } val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL) val checkpointLocation = conf.options.getOrElse("checkpointLocation", DEFAULT_CHECKPOINT_LOCATION) @@ -94,9 +95,9 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { writer.save(outputPath) } - protected def repartition(df:DataFrame, defaultParallelism:Int) = { + protected def repartition(df: DataFrame, defaultParallelism: Int) = { conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match { - case Some(numOfPartitions:Int) => + case Some(numOfPartitions: Int) => if (numOfPartitions > defaultParallelism) df.repartition(numOfPartitions) else df.coalesce(numOfPartitions) case None => df @@ -106,14 +107,16 @@ abstract class Sink(queryName:String, override val conf:TaskConf) extends Task { /** * KafkaSink + * * @param queryName * @param conf */ -class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class KafkaSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", "topic") + override val FORMAT: String = "kafka" - override def preprocess(df:DataFrame):DataFrame = { + override def preprocess(df: DataFrame): DataFrame = { import org.apache.spark.sql.functions._ logger.debug(s"${LOG_PREFIX} schema: ${df.schema}") @@ -124,7 +127,7 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { val columns = df.columns df.select(concat_ws(delimiter, columns.map(c => col(c)): _*).alias("value")) - case format:String => + case format: String => if (format != "json") logger.warn(s"${LOG_PREFIX} unsupported format '$format'. use default json format") df.selectExpr("to_json(struct(*)) AS value") } @@ -136,21 +139,25 @@ class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { /** * FileSink + * * @param queryName * @param conf */ -class FileSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class FileSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set("path", "format") + override val FORMAT: String = conf.options.getOrElse("format", "parquet") } /** * HiveSink + * * @param queryName * @param conf */ -class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class HiveSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set("database", "table") + override val FORMAT: String = "hive" override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = { @@ -167,11 +174,13 @@ class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { /** * ESSink + * * @param queryName * @param conf */ -class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class ESSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set("es.nodes", "path", "es.port") + override val FORMAT: String = "es" override def write(inputDF: DataFrame): Unit = { @@ -188,16 +197,18 @@ class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { /** * S2graphSink + * * @param queryName * @param conf */ -class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { +class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { override def mandatoryOptions: Set[String] = Set() + override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction") - override def write(inputDF: DataFrame):Unit = { + override def write(inputDF: DataFrame): Unit = { val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) if (inputDF.isStreaming) writeStream(df.writeStream) @@ -206,7 +217,11 @@ class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) val bulkLoadOptions: GraphFileOptions = S2GraphHelper.toGraphFileOptions(conf) val input = df.rdd - val transformer = new SparkGraphElementLoaderTransformer(config, bulkLoadOptions) + val transformer = new SparkBulkLoaderTransformer(config, bulkLoadOptions) + + implicit val reader = new RowBulkFormatReader + implicit val writer = new KeyValueWriter + val kvs = transformer.transform(input) HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, kvs.flatMap(ls => ls), bulkLoadOptions) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index baf9b32..c382813 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -22,6 +22,8 @@ package org.apache.s2graph.s2jobs.loader import org.apache.s2graph.core.PostProcess import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue} import org.apache.s2graph.s2jobs.BaseSparkTest +import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, TsvBulkFormatReader} +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter import org.apache.spark.rdd.RDD import play.api.libs.json.Json @@ -36,6 +38,10 @@ class GraphFileGeneratorTest extends BaseSparkTest { case "spark" => val input: RDD[String] = sc.parallelize(edges) val transformer = new SparkBulkLoaderTransformer(s2Config, options) + + implicit val reader = new TsvBulkFormatReader + implicit val writer = new KeyValueWriter + val kvs = transformer.transform(input) kvs.flatMap { kvs => kvs.map { kv => @@ -46,6 +52,10 @@ class GraphFileGeneratorTest extends BaseSparkTest { case "local" => val input = edges val transformer = new LocalBulkLoaderTransformer(s2Config, options) + + implicit val reader = new TsvBulkFormatReader + implicit val writer = new KeyValueWriter + val kvs = transformer.transform(input) kvs.flatMap { kvs => kvs.map { kv => @@ -68,7 +78,11 @@ class GraphFileGeneratorTest extends BaseSparkTest { e.getDirection()) }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction").rdd - val transformer = new SparkGraphElementLoaderTransformer(s2Config, options) + val transformer = new SparkBulkLoaderTransformer(s2Config, options) + + implicit val reader = new RowBulkFormatReader + implicit val writer = new KeyValueWriter + val kvs = transformer.transform(rows) kvs.flatMap { kvs => kvs.map { kv =>
