simplify Transformer interface.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ae19dc11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ae19dc11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ae19dc11

Branch: refs/heads/master
Commit: ae19dc11c039c92deb92ea42fdb48b3bbe7bd6dd
Parents: b5535eb
Author: DO YUNG YOON <[email protected]>
Authored: Mon Apr 2 17:38:09 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Mon Apr 2 17:38:09 2018 +0900

----------------------------------------------------------------------
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  6 ++
 .../s2jobs/loader/HFileMRGenerator.scala        |  6 ++
 .../loader/LocalBulkLoaderTransformer.scala     | 28 +++-----
 .../loader/SparkBulkLoaderTransformer.scala     | 49 ++++++-------
 .../SparkGraphElementLoaderTransformer.scala    | 75 --------------------
 .../s2jobs/serde/GraphElementWritable.scala     |  4 ++
 .../s2graph/s2jobs/serde/Transformer.scala      | 19 ++---
 .../s2jobs/serde/writer/KeyValueWriter.scala    |  6 +-
 .../org/apache/s2graph/s2jobs/task/Sink.scala   | 55 ++++++++------
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 16 ++++-
 10 files changed, 107 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index b4ac51f..431631b 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, 
TableName}
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
@@ -113,6 +115,10 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
                         rdd: RDD[String],
                         _options: GraphFileOptions): Unit = {
     val transformer = new SparkBulkLoaderTransformer(config, _options)
+
+    implicit val reader = new TsvBulkFormatReader
+    implicit val writer = new KeyValueWriter
+
     val kvs = transformer.transform(rdd).flatMap(kvs => kvs)
 
     HFileGenerator.generateHFile(sc, config, kvs, _options)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index 3502bee..87968f0 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -34,6 +34,8 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
SequenceFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
SequenceFileOutputFormat}
 import org.apache.hadoop.mapreduce.{Job, Mapper}
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
@@ -105,6 +107,10 @@ object HFileMRGenerator extends RawFileGenerator[String, 
KeyValue] {
                input: RDD[String],
                options: GraphFileOptions): RDD[KeyValue] = {
     val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+    implicit val reader = new TsvBulkFormatReader
+    implicit val writer = new KeyValueWriter
+
     transformer.transform(input).flatMap(kvs => kvs)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
index 7d405a6..ad3483c 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -20,41 +20,31 @@
 package org.apache.s2graph.s2jobs.loader
 
 import com.typesafe.config.Config
-import org.apache.hadoop.hbase.KeyValue
 import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, 
GraphElementWritable, Transformer}
 import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
 
 import scala.concurrent.ExecutionContext
+import scala.reflect.ClassTag
 
 class LocalBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions)(implicit ec: 
ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
+                                 val options: GraphFileOptions)(implicit ec: 
ExecutionContext) extends Transformer[Seq] {
   val s2: S2Graph = S2GraphHelper.initS2Graph(config)
 
-  override val reader = new TsvBulkFormatReader
-  override val writer = new KeyValueWriter
-
-  override def read(input: Seq[String]): Seq[GraphElement] = 
input.flatMap(reader.read(s2)(_))
-
-  override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = 
elements.map(writer.write(s2)(_))
-
-  override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = 
{
+  override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit 
writer: GraphElementWritable[T]): Seq[T] = {
     val degrees = elements.flatMap { element =>
       DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
     }.groupBy(_._1).mapValues(_.map(_._2).sum)
 
     degrees.toSeq.map { case (degreeKey, count) =>
-      DegreeKey.toKeyValue(s2, degreeKey, count)
+      writer.writeDegree(s2)(degreeKey, count)
     }
   }
 
-  override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
+  override def transform[S: ClassTag, T: ClassTag](input: Seq[S])(implicit 
reader: GraphElementReadable[S], writer: GraphElementWritable[T]): Seq[T] = {
+    val elements = input.flatMap(reader.read(s2)(_))
+    val kvs = elements.map(writer.write(s2)(_))
+    val degrees = if (options.buildDegree) buildDegrees[T](elements) else Nil
 
     kvs ++ degrees
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index cd991e1..03d9784 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -20,35 +20,17 @@
 package org.apache.s2graph.s2jobs.loader
 
 import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
 import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.serde.{GraphElementReadable, 
GraphElementWritable, Transformer}
 import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
 import org.apache.spark.rdd.RDD
 
-class SparkBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions) extends 
Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
-  val reader = new TsvBulkFormatReader
-
-  val writer = new KeyValueWriter
-
-  override def read(input: RDD[String]): RDD[GraphElement] = 
input.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
+import scala.reflect.ClassTag
 
-    iter.flatMap { line =>
-      reader.read(s2)(line)
-    }
-  }
-
-  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = 
elements.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.map(writer.write(s2)(_))
-  }
+class SparkBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions) extends 
Transformer[RDD] {
 
-  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] 
= {
+  override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit 
writer: GraphElementWritable[T]): RDD[T] = {
     val degrees = elements.mapPartitions { iter =>
       val s2 = S2GraphHelper.initS2Graph(config)
 
@@ -61,16 +43,27 @@ class SparkBulkLoaderTransformer(val config: Config,
       val s2 = S2GraphHelper.initS2Graph(config)
 
       iter.map { case (degreeKey, count) =>
-        DegreeKey.toKeyValue(s2, degreeKey, count)
+        writer.writeDegree(s2)(degreeKey, count)
       }
     }
   }
 
-  override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
+  override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit 
reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = {
+    val elements = input.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { line =>
+        reader.read(s2)(line)
+      }
+    }
+
+    val kvs = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map(writer.write(s2)(_))
+    }
 
     if (options.buildDegree) kvs ++ buildDegrees(elements)
-    kvs
+    else kvs
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
deleted file mode 100644
index fcf8d4c..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkGraphElementLoaderTransformer.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.loader
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, 
TsvBulkFormatReader}
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
-
-class SparkGraphElementLoaderTransformer(val config: Config,
-                                         val options: GraphFileOptions) 
extends Transformer[Row, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
-  val reader = new RowBulkFormatReader
-
-  val writer = new KeyValueWriter
-
-  override def read(input: RDD[Row]): RDD[GraphElement] = input.mapPartitions 
{ iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.flatMap(reader.read(s2)(_))
-  }
-
-  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = 
elements.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.map(writer.write(s2)(_))
-  }
-
-  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] 
= {
-    val degrees = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.flatMap { element =>
-        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
-      }
-    }.reduceByKey(_ + _)
-
-    degrees.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.map { case (degreeKey, count) =>
-        DegreeKey.toKeyValue(s2, degreeKey, count)
-      }
-    }
-  }
-
-  override def transform(input: RDD[Row]): RDD[Seq[HKeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    if (options.buildDegree) kvs ++ buildDegrees(elements)
-    kvs
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
index ae082d8..f71a9e8 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala
@@ -20,7 +20,11 @@
 package org.apache.s2graph.s2jobs.serde
 
 import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.DegreeKey
 
 trait GraphElementWritable[T] extends Serializable {
+
   def write(s2: S2Graph)(element: GraphElement): T
+
+  def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): T
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
index 3902c63..ef1bd29 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala
@@ -23,28 +23,21 @@ import com.typesafe.config.Config
 import org.apache.s2graph.core.GraphElement
 import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 
+import scala.reflect.ClassTag
+
 /**
   * Define serialize/deserialize.
   * Source -> GraphElement
   * GraphElement -> Target
   *
-  * @tparam S : Source class. ex) String, RDF.Statement, ...
-  * @tparam T : Target class. ex) KeyValue, Array[Byte], String, ...
   * @tparam M : Container type. ex) RDD, Seq, List, ...
   */
-trait Transformer[S, T, M[_]] extends Serializable {
+trait Transformer[M[_]] extends Serializable {
   val config: Config
   val options: GraphFileOptions
 
-  val reader: GraphElementReadable[S]
-
-  val writer: GraphElementWritable[T]
-
-  def read(input: M[S]): M[GraphElement]
-
-  def write(elements: M[GraphElement]): M[T]
-
-  def buildDegrees(elements: M[GraphElement]): M[T]
+  def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: 
GraphElementWritable[T]): M[T]
 
-  def transform(input: M[S]): M[T]
+  def transform[S: ClassTag, T: ClassTag](input: M[S])
+                           (implicit reader: GraphElementReadable[S], writer: 
GraphElementWritable[T]): M[T]
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
index 02034af..cc1f801 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.s2jobs.serde.writer
 
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
 import org.apache.s2graph.s2jobs.serde.GraphElementWritable
 
 class KeyValueWriter(autoEdgeCreate: Boolean = false) extends 
GraphElementWritable[Seq[KeyValue]] {
@@ -30,4 +30,8 @@ class KeyValueWriter(autoEdgeCreate: Boolean = false) extends 
GraphElementWritab
       new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
     }
   }
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): 
Seq[KeyValue] = {
+    DegreeKey.toKeyValue(s2, degreeKey, count)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index b7a91d9..bc67822 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,29 +20,30 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.Config
-import org.apache.s2graph.core.{GraphElement, Management}
+import org.apache.s2graph.core.Management
 import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer, SparkGraphElementLoaderTransformer}
-import org.apache.s2graph.s2jobs.serde.GraphElementReadable
-import org.apache.spark.rdd.RDD
+import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
 /**
   * Sink
+  *
   * @param queryName
   * @param conf
   */
-abstract class Sink(queryName:String, override val conf:TaskConf) extends Task 
{
+abstract class Sink(queryName: String, override val conf: TaskConf) extends 
Task {
   val DEFAULT_CHECKPOINT_LOCATION = 
s"/tmp/streamingjob/${queryName}/${conf.name}"
   val DEFAULT_TRIGGER_INTERVAL = "10 seconds"
 
-  val FORMAT:String
+  val FORMAT: String
 
-  def preprocess(df:DataFrame):DataFrame = df
+  def preprocess(df: DataFrame): DataFrame = df
 
-  def write(inputDF: DataFrame):Unit = {
+  def write(inputDF: DataFrame): Unit = {
     val df = repartition(preprocess(inputDF), 
inputDF.sparkSession.sparkContext.defaultParallelism)
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
@@ -56,7 +57,7 @@ abstract class Sink(queryName:String, override val 
conf:TaskConf) extends Task {
       case "update" => OutputMode.Update()
       case "complete" => OutputMode.Complete()
       case _ => logger.warn(s"${LOG_PREFIX} unsupported output mode. use 
default output mode 'append'")
-                OutputMode.Append()
+        OutputMode.Append()
     }
     val interval = conf.options.getOrElse("interval", DEFAULT_TRIGGER_INTERVAL)
     val checkpointLocation = conf.options.getOrElse("checkpointLocation", 
DEFAULT_CHECKPOINT_LOCATION)
@@ -94,9 +95,9 @@ abstract class Sink(queryName:String, override val 
conf:TaskConf) extends Task {
     writer.save(outputPath)
   }
 
-  protected def repartition(df:DataFrame, defaultParallelism:Int) = {
+  protected def repartition(df: DataFrame, defaultParallelism: Int) = {
     conf.options.get("numPartitions").map(n => Integer.parseInt(n)) match {
-      case Some(numOfPartitions:Int) =>
+      case Some(numOfPartitions: Int) =>
         if (numOfPartitions > defaultParallelism) 
df.repartition(numOfPartitions)
         else df.coalesce(numOfPartitions)
       case None => df
@@ -106,14 +107,16 @@ abstract class Sink(queryName:String, override val 
conf:TaskConf) extends Task {
 
 /**
   * KafkaSink
+  *
   * @param queryName
   * @param conf
   */
-class KafkaSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) 
{
+class KafkaSink(queryName: String, conf: TaskConf) extends Sink(queryName, 
conf) {
   override def mandatoryOptions: Set[String] = Set("kafka.bootstrap.servers", 
"topic")
+
   override val FORMAT: String = "kafka"
 
-  override def preprocess(df:DataFrame):DataFrame = {
+  override def preprocess(df: DataFrame): DataFrame = {
     import org.apache.spark.sql.functions._
 
     logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
@@ -124,7 +127,7 @@ class KafkaSink(queryName:String, conf:TaskConf) extends 
Sink(queryName, conf) {
 
         val columns = df.columns
         df.select(concat_ws(delimiter, columns.map(c => col(c)): 
_*).alias("value"))
-      case format:String =>
+      case format: String =>
         if (format != "json") logger.warn(s"${LOG_PREFIX} unsupported format 
'$format'. use default json format")
         df.selectExpr("to_json(struct(*)) AS value")
     }
@@ -136,21 +139,25 @@ class KafkaSink(queryName:String, conf:TaskConf) extends 
Sink(queryName, conf) {
 
 /**
   * FileSink
+  *
   * @param queryName
   * @param conf
   */
-class FileSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class FileSink(queryName: String, conf: TaskConf) extends Sink(queryName, 
conf) {
   override def mandatoryOptions: Set[String] = Set("path", "format")
+
   override val FORMAT: String = conf.options.getOrElse("format", "parquet")
 }
 
 /**
   * HiveSink
+  *
   * @param queryName
   * @param conf
   */
-class HiveSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class HiveSink(queryName: String, conf: TaskConf) extends Sink(queryName, 
conf) {
   override def mandatoryOptions: Set[String] = Set("database", "table")
+
   override val FORMAT: String = "hive"
 
   override protected def writeBatchInner(writer: DataFrameWriter[Row]): Unit = 
{
@@ -167,11 +174,13 @@ class HiveSink(queryName:String, conf:TaskConf) extends 
Sink(queryName, conf) {
 
 /**
   * ESSink
+  *
   * @param queryName
   * @param conf
   */
-class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) {
+class ESSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
   override def mandatoryOptions: Set[String] = Set("es.nodes", "path", 
"es.port")
+
   override val FORMAT: String = "es"
 
   override def write(inputDF: DataFrame): Unit = {
@@ -188,16 +197,18 @@ class ESSink(queryName:String, conf:TaskConf) extends 
Sink(queryName, conf) {
 
 /**
   * S2graphSink
+  *
   * @param queryName
   * @param conf
   */
-class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, 
conf) {
+class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, 
conf) {
   override def mandatoryOptions: Set[String] = Set()
+
   override val FORMAT: String = 
"org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
 
   private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", 
"operation", "elem", "direction")
 
-  override def write(inputDF: DataFrame):Unit = {
+  override def write(inputDF: DataFrame): Unit = {
     val df = repartition(preprocess(inputDF), 
inputDF.sparkSession.sparkContext.defaultParallelism)
 
     if (inputDF.isStreaming) writeStream(df.writeStream)
@@ -206,7 +217,11 @@ class S2graphSink(queryName:String, conf:TaskConf) extends 
Sink(queryName, conf)
       val bulkLoadOptions: GraphFileOptions = 
S2GraphHelper.toGraphFileOptions(conf)
       val input = df.rdd
 
-      val transformer = new SparkGraphElementLoaderTransformer(config, 
bulkLoadOptions)
+      val transformer = new SparkBulkLoaderTransformer(config, bulkLoadOptions)
+
+      implicit val reader = new RowBulkFormatReader
+      implicit val writer = new KeyValueWriter
+
       val kvs = transformer.transform(input)
 
       HFileGenerator.generateHFile(df.sparkSession.sparkContext, config, 
kvs.flatMap(ls => ls), bulkLoadOptions)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ae19dc11/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index baf9b32..c382813 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -22,6 +22,8 @@ package org.apache.s2graph.s2jobs.loader
 import org.apache.s2graph.core.PostProcess
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.s2graph.s2jobs.serde.reader.{RowBulkFormatReader, 
TsvBulkFormatReader}
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
 import org.apache.spark.rdd.RDD
 import play.api.libs.json.Json
 
@@ -36,6 +38,10 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       case "spark" =>
         val input: RDD[String] = sc.parallelize(edges)
         val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new TsvBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
         val kvs = transformer.transform(input)
         kvs.flatMap { kvs =>
           kvs.map { kv =>
@@ -46,6 +52,10 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       case "local" =>
         val input = edges
         val transformer = new LocalBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new TsvBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
         val kvs = transformer.transform(input)
         kvs.flatMap { kvs =>
           kvs.map { kv =>
@@ -68,7 +78,11 @@ class GraphFileGeneratorTest extends BaseSparkTest {
             e.getDirection())
         }.toDF("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction").rdd
 
-        val transformer = new SparkGraphElementLoaderTransformer(s2Config, 
options)
+        val transformer = new SparkBulkLoaderTransformer(s2Config, options)
+
+        implicit val reader = new RowBulkFormatReader
+        implicit val writer = new KeyValueWriter
+
         val kvs = transformer.transform(rows)
         kvs.flatMap { kvs =>
           kvs.map { kv =>

Reply via email to