Repository: incubator-s2graph Updated Branches: refs/heads/master 5c42b1cf6 -> 1294c1822
add wal package, WalLog class, UserDefinedAggregateFunction. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/bbc64682 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/bbc64682 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/bbc64682 Branch: refs/heads/master Commit: bbc64682dee2fa6cd269ee2b90a010ec29cacd45 Parents: 07a5af3 Author: DO YUNG YOON <[email protected]> Authored: Mon Jul 16 18:58:32 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Jul 16 18:58:32 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/Schema.scala | 72 ++++++-- .../org/apache/s2graph/s2jobs/task/Source.scala | 6 +- .../org/apache/s2graph/s2jobs/wal/WalLog.scala | 31 ++++ .../process/S2EdgeDataAggregateProcess.scala | 33 ++++ .../S2EdgeDataArrayAggregateProcess.scala | 29 ++++ .../s2jobs/wal/udafs/S2EdgeDataAggregate.scala | 166 +++++++++++++++++++ .../S2EdgeDataAggregateProcessTest.scala | 87 ++++++++++ 7 files changed, 410 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bbc64682/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala index 58d3368..1c47f13 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Schema.scala @@ -22,10 +22,19 @@ package org.apache.s2graph.s2jobs import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} object Schema { - val BulkLoadSchema = StructType(Seq( - StructField("timestamp", LongType, false), - StructField("operation", StringType, false), - StructField("elem", StringType, false), + /** + * root + * |-- timestamp: long (nullable = false) + * |-- operation: string (nullable = false) + * |-- elem: string (nullable = false) + */ + val CommonFields = Seq( + StructField("timestamp", LongType, nullable = false), + StructField("operation", StringType, nullable = false), + StructField("elem", StringType, nullable = false) + ) + + val BulkLoadSchema = StructType(CommonFields ++ Seq( StructField("from", StringType, false), StructField("to", StringType, false), StructField("label", StringType, false), @@ -33,24 +42,63 @@ object Schema { StructField("direction", StringType, true) )) - val VertexSchema = StructType(Seq( - StructField("timestamp", LongType, false), - StructField("operation", StringType, false), - StructField("elem", StringType, false), + /** + * root + * |-- timestamp: long (nullable = true) + * |-- operation: string (nullable = true) + * |-- elem: string (nullable = true) + * |-- id: string (nullable = true) + * |-- service: string (nullable = true) + * |-- column: string (nullable = true) + * |-- props: string (nullable = true) + */ + val VertexSchema = StructType(CommonFields ++ Seq( StructField("id", StringType, false), StructField("service", StringType, false), StructField("column", StringType, false), StructField("props", StringType, false) )) - val EdgeSchema = StructType(Seq( - StructField("timestamp", LongType, false), - StructField("operation", StringType, false), - StructField("elem", StringType, false), + + /** + * root + * |-- timestamp: long (nullable = true) + * |-- operation: string (nullable = true) + * |-- elem: string (nullable = true) + * |-- from: string (nullable = true) + * |-- to: string (nullable = true) + * |-- label: string (nullable = true) + * |-- props: string (nullable = true) + * |-- direction: string (nullable = true) + */ + val EdgeSchema = StructType(CommonFields ++ Seq( StructField("from", StringType, false), StructField("to", StringType, false), StructField("label", StringType, false), StructField("props", StringType, false), StructField("direction", StringType, true) )) + + /** + * root + * |-- timestamp: long (nullable = false) + * |-- operation: string (nullable = false) + * |-- elem: string (nullable = false) + * |-- id: string (nullable = true) + * |-- service: string (nullable = true) + * |-- column: string (nullable = true) + * |-- from: string (nullable = true) + * |-- to: string (nullable = true) + * |-- label: string (nullable = true) + * |-- props: string (nullable = true) + */ + val GraphElementSchema = StructType(CommonFields ++ Seq( + StructField("id", StringType, nullable = true), + StructField("service", StringType, nullable = true), + StructField("column", StringType, nullable = true), + StructField("from", StringType, nullable = true), + StructField("to", StringType, nullable = true), + StructField("label", StringType, nullable = true), + StructField("props", StringType, nullable = true) + )) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bbc64682/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index bfac62b..5bbf166 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -19,12 +19,13 @@ package org.apache.s2graph.s2jobs.task -import org.apache.s2graph.core.Management +import org.apache.s2graph.core.{JSONParser, Management} import org.apache.s2graph.s2jobs.Schema import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer} import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader import org.apache.s2graph.s2jobs.serde.writer.RowDataFrameWriter import org.apache.spark.sql.{DataFrame, SparkSession} +import play.api.libs.json.{JsObject, Json} /** @@ -103,8 +104,9 @@ class FileSource(conf:TaskConf) extends Source(conf) { case "edgeLog" => ss.read.format("com.databricks.spark.csv").option("delimiter", "\t") .schema(BulkLoadSchema).load(paths: _*) - case _ => ss.read.format(format).load(paths: _*) + case _ => val df = ss.read.format(format).load(paths: _*) + if (columnsOpt.isDefined) df.toDF(columnsOpt.get.split(",").map(_.trim): _*) else df } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bbc64682/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala new file mode 100644 index 0000000..1b70a8a --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -0,0 +1,31 @@ +package org.apache.s2graph.s2jobs.wal + +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} + +case class WalLog(timestamp:Long, + operation:String, + elem:String, + from:String, + to:String, + service:String, + label:String, + props:String) { + val id = from + val columnName = label + val serviceName = to +} + +object WalLog { + val WalLogSchema = StructType(Seq( + StructField("timestamp", LongType, false), + StructField("operation", StringType, false), + StructField("elem", StringType, false), + StructField("from", StringType, false), + StructField("to", StringType, false), + StructField("service", StringType, true), + StructField("label", StringType, false), + StructField("props", StringType, false) + // StructField("direction", StringType, true) + )) +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bbc64682/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala new file mode 100644 index 0000000..a1d17f1 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala @@ -0,0 +1,33 @@ +package org.apache.s2graph.s2jobs.wal.process + +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.udafs._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** + * expect S2EdgeData dataframe as input. + * @param taskConf + */ +class S2EdgeDataAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000) + val groupByColumns = taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_)) + val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("timestamp,to,label,props").split(",").map(col(_)) + taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _)) + + val aggregator = new GroupByAgg(maxNumOfEdges) + + val edges = inputMap(taskConf.inputs.head) + + edges + .groupBy(groupByColumns: _*) + .agg( + aggregator(aggregateColumns: _*).as("edges"), + max(col("timestamp")).as("max_ts"), + min(col("timestamp")).as("min_ts") + ) + } + + override def mandatoryOptions: Set[String] = Set.empty +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bbc64682/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala new file mode 100644 index 0000000..08df269 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataArrayAggregateProcess.scala @@ -0,0 +1,29 @@ +package org.apache.s2graph.s2jobs.wal.process + +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.udafs.GroupByArrayAgg +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions._ + +class S2EdgeDataArrayAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + import ss.sqlContext.implicits._ + val maxNumOfEdges = taskConf.options.get("maxNumOfEdges").map(_.toInt).getOrElse(1000) + val groupByColumns = taskConf.options.get("groupByColumns").getOrElse("from").split(",").map(col(_)) + val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("edges").split(",").map(col(_)) + taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _)) + val aggregator = new GroupByArrayAgg(maxNumOfEdges) + + val edges = inputMap(taskConf.inputs.head) + + edges + .groupBy(groupByColumns: _*) + .agg( + aggregator(aggregateColumns: _*).as("edges"), + max(col("max_ts")).as("max_ts"), + min(col("min_ts")).as("min_ts") + ) + } + + override def mandatoryOptions: Set[String] = Set.empty +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bbc64682/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala new file mode 100644 index 0000000..1b12235 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala @@ -0,0 +1,166 @@ +package org.apache.s2graph.s2jobs.wal.udafs + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} +import org.apache.spark.sql.types._ + +object S2EdgeDataAggregate { + type Element = (Long, String, String, String) + + val emptyRow = new GenericRow(Array(-1L, "empty", "empty", "empty")) + + val elementOrd = Ordering.by[Element, Long](_._1) + + val rowOrdering = new Ordering[Row] { + override def compare(x: Row, y: Row): Int = { + x.getAs[Long](0).compareTo(y.getAs[Long](0)) + } + } + + val rowOrderingDesc = new Ordering[Row] { + override def compare(x: Row, y: Row): Int = { + -x.getAs[Long](0).compareTo(y.getAs[Long](0)) + } + } + + val fields = Seq( + StructField(name = "timestamp", LongType), + StructField(name = "to", StringType), + StructField(name = "label", StringType), + StructField(name = "props", StringType) + ) + + val arrayType = ArrayType(elementType = StructType(fields)) + + def apply(maxNumOfEdges: Int = 1000): GroupByAgg = { + new GroupByAgg(maxNumOfEdges) + } + + def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: Ordering[T]): Seq[T] = { + import scala.collection.mutable + val (n, m) = (cur.size, prev.size) + + var (i, j) = (0, 0) + var idx = 0 + val arr = new mutable.ArrayBuffer[T](size) + + while (idx < size && i < n && j < m) { + if (ordering.compare(cur(i), prev(j)) > 0) { + arr += cur(i) + i += 1 + } else { + arr += prev(j) + j += 1 + } + idx += 1 + } + while (idx < size && i < n) { + arr += cur(i) + i += 1 + } + while (idx < size && j < m) { + arr += prev(j) + j += 1 + } + + arr + } +} + +class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { + + import S2EdgeDataAggregate._ + + implicit val ord = rowOrderingDesc + + val arrayType = ArrayType(elementType = StructType(fields)) + + override def inputSchema: StructType = StructType(fields) + + override def bufferSchema: StructType = StructType(Seq( + StructField(name = "edges", dataType = arrayType), + StructField(name = "buffered", dataType = BooleanType) + )) + + override def dataType: DataType = arrayType + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = { + buffer.update(0, scala.collection.mutable.ListBuffer.empty[Element]) + } + + /* not optimized */ + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + val element = input + + val prev = buffer.getAs[Seq[Row]](0) + val appended = prev :+ element + + buffer.update(0, appended) + buffer.update(1, false) + } + + private def takeTopK(ls: Seq[Row], k: Int) = { + val sorted = ls.sorted + if (sorted.size <= k) sorted else sorted.take(k) + } + + /* not optimized */ + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + val cur = buffer2.getAs[Seq[Row]](0) + val prev = buffer1.getAs[Seq[Row]](0) + + buffer1.update(0, takeTopK(prev ++ cur, maxNumOfEdges)) + buffer1.update(1, true) + } + + override def evaluate(buffer: Row): Any = { + val ls = buffer.getAs[Seq[Row]](0) + val buffered = buffer.getAs[Boolean](1) + if (buffered) ls + else takeTopK(ls, maxNumOfEdges) + } +} + +class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { + + import S2EdgeDataAggregate._ + + implicit val ord = rowOrdering + + import scala.collection.mutable + + override def inputSchema: StructType = StructType(Seq( + StructField(name = "edges", dataType = arrayType) + )) + + override def bufferSchema: StructType = StructType(Seq( + StructField(name = "edges", dataType = arrayType) + )) + + override def dataType: DataType = arrayType + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = + buffer.update(0, mutable.ListBuffer.empty[Row]) + + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + val cur = input.getAs[Seq[Row]](0) + val prev = buffer.getAs[Seq[Row]](0) + val merged = mergeTwoSeq(cur, prev, maxNumOfEdges) + + buffer.update(0, merged) + } + + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + val cur = buffer2.getAs[Seq[Row]](0) + val prev = buffer1.getAs[Seq[Row]](0) + + buffer1.update(0, mergeTwoSeq(cur, prev, maxNumOfEdges)) + } + + override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bbc64682/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala new file mode 100644 index 0000000..72c8cf5 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala @@ -0,0 +1,87 @@ +package org.apache.s2graph.s2jobs.wal.process + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.WalLog +import org.apache.s2graph.s2jobs.wal.udafs._ +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { + val walLogsLs = Seq( + WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), + WalLog(2L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""), + WalLog(3L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""), + WalLog(4L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""") + ) + val walLogsLs2 = Seq( + WalLog(5L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), + WalLog(6L, "insert", "edge", "a", "c", "s2graph", "friends", """{"name": 2}"""), + WalLog(7L, "insert", "edge", "a", "d", "s2graph", "friends", """{"name": 3}"""), + WalLog(8L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 4}""") + ) + + + test("test S2EdgeDataAggregateProcess") { + import spark.sqlContext.implicits._ + + val edges = spark.createDataset((0 until 10).flatMap(ith => walLogsLs)).toDF() + val inputMap = Map("edges" -> edges) + val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"), + options = Map("maxNumOfEdges" -> "10", + "groupByAggClassName" -> "GroupByAgg")) + + val job = new S2EdgeDataAggregateProcess(taskConf = taskConf) + val processed = job.execute(spark, inputMap) + + processed.printSchema() + processed.collect().foreach { row => + println(row) + } + } + + test("test S2EdgeDataArrayAggregateProcess") { + import spark.sqlContext.implicits._ + + val edges = spark.createDataset(walLogsLs).toDF() + val edges2 = spark.createDataset(walLogsLs2).toDF() + + val firstConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"), + options = Map("maxNumOfEdges" -> "10")) + + val firstJob = new S2EdgeDataAggregateProcess(firstConf) + val firstJob2 = new S2EdgeDataAggregateProcess(firstConf) + + val first = firstJob.execute(spark, Map("edges" -> edges)) + val first2 = firstJob2.execute(spark, Map("edges" -> edges2)) + + val secondInputMap = Map( + "aggregated" -> first.union(first2) + ) + + val secondConf = new TaskConf(name = "testarray", `type` = "agg", + inputs = Seq("aggregated"), + options = Map("maxNumOfEdges" -> "10")) + + val secondJob = new S2EdgeDataArrayAggregateProcess(secondConf) + + + val processed = secondJob.execute(spark, secondInputMap) + + processed.printSchema() + processed.collect().foreach { row => + println(row) + } + } + + test("mergeTwoSeq") { + val prev: Array[Int] = Array(3, 2, 1) + val cur: Array[Int] = Array(4, 2, 2) + + val ls = S2EdgeDataAggregate.mergeTwoSeq(prev, cur, 10) + println(ls.size) + + ls.foreach { x => + println(x) + } + } +} \ No newline at end of file
