bug fix on S2EdgeDataFrameWriter.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/31b51929 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/31b51929 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/31b51929 Branch: refs/heads/master Commit: 31b51929c4073d761bb03541001e07216cf8faa8 Parents: a0ce6f8 Author: DO YUNG YOON <[email protected]> Authored: Thu Apr 5 19:00:23 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu Apr 5 19:00:53 2018 +0900 ---------------------------------------------------------------------- .../s2graph/s2jobs/loader/HFileGenerator.scala | 2 +- .../s2jobs/serde/GraphElementReadable.scala | 2 +- .../s2jobs/serde/reader/IdentityReader.scala | 4 +- .../serde/reader/RowBulkFormatReader.scala | 4 +- .../s2jobs/serde/reader/S2GraphCellReader.scala | 64 +++++--------- .../serde/reader/TsvBulkFormatReader.scala | 4 +- .../writer/GraphElementDataFrameWriter.scala | 49 ----------- .../serde/writer/S2EdgeDataFrameWriter.scala | 50 +++++++++++ .../serde/writer/S2VertexDataFrameWriter.scala | 51 ++++++++++++ .../org/apache/s2graph/s2jobs/task/Source.scala | 24 ++++-- .../apache/s2graph/s2jobs/task/SourceTest.scala | 88 ++++++++++++++++++-- 11 files changed, 227 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index c105448..a0c14e0 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -37,7 +37,7 @@ import org.apache.s2graph.core.GraphElement import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement import org.apache.s2graph.s2jobs.S2GraphHelper import org.apache.s2graph.s2jobs.serde.reader.{S2GraphCellReader, TsvBulkFormatReader} -import org.apache.s2graph.s2jobs.serde.writer.{GraphElementDataFrameWriter, IdentityWriter, KeyValueWriter} +import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, IdentityWriter, KeyValueWriter} import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala index 0544a84..148dc66 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala @@ -22,5 +22,5 @@ package org.apache.s2graph.s2jobs.serde import org.apache.s2graph.core.{GraphElement, S2Graph} trait GraphElementReadable[S] extends Serializable { - def read(graph: S2Graph)(data: S): Option[GraphElement] + def read(graph: S2Graph)(data: S): Seq[GraphElement] } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala index a4d985b..8652170 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala @@ -23,6 +23,6 @@ import org.apache.s2graph.core.{GraphElement, S2Graph} import org.apache.s2graph.s2jobs.serde.GraphElementReadable class IdentityReader extends GraphElementReadable[GraphElement] { - override def read(graph: S2Graph)(data: GraphElement): Option[GraphElement] = - Option(data) + override def read(graph: S2Graph)(data: GraphElement): Seq[GraphElement] = + Seq(data) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala index 12b2ba2..de8a365 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.Row class RowBulkFormatReader extends GraphElementReadable[Row] { private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction") - override def read(s2: S2Graph)(row: Row): Option[GraphElement] = - S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, RESERVED_COLUMN) + override def read(s2: S2Graph)(row: Row): Seq[GraphElement] = + S2GraphHelper.sparkSqlRowToGraphElement(s2, row, row.schema, RESERVED_COLUMN).toSeq } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala index f5487ab..454294e 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala @@ -27,54 +27,28 @@ import org.apache.s2graph.core.{GraphElement, S2Graph} import org.apache.s2graph.s2jobs.serde.GraphElementReadable class S2GraphCellReader(elementType: String) extends GraphElementReadable[Seq[Cell]]{ - override def read(s2: S2Graph)(cells: Seq[Cell]): Option[GraphElement] = { - if (cells.isEmpty) None - else { - //TODO: - val cell = cells.head - val schemaVer = HBaseType.DEFAULT_VERSION - val cf = cell.getFamily - - val kvs = cells.map { cell => - new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, cell.getQualifier, - cell.getValue, cell.getTimestamp, SKeyValue.Default) - } - elementType match { - case "IndexEdge" => - if (!Bytes.equals(cf, SKeyValue.EdgeCf)) - throw new IllegalArgumentException(s"$elementType is provided by user, but actual column family differ as e") + override def read(s2: S2Graph)(cells: Seq[Cell]): Seq[GraphElement] = { + val schemaVer = HBaseType.DEFAULT_VERSION + val kvs = cells.map { cell => + new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, cell.getQualifier, + cell.getValue, cell.getTimestamp, SKeyValue.Default) + } + elementType.toLowerCase match { + case "vertex" | "v" => + s2.defaultStorage.serDe.vertexDeserializer(schemaVer) + .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]).toSeq + case "indexedge" | "ie" => + kvs.flatMap { kv => s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer) - .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]) - case "SnapshotEdge" => - //TODO: replace this to use separate column family: SKeyValue.SnapshotEdgeCF - if (!Bytes.equals(cf, SKeyValue.EdgeCf)) - throw new IllegalArgumentException(s"$elementType is provided by user, but actual column family differ as e") - + .fromKeyValues(Seq(kv), None).map(_.asInstanceOf[GraphElement]) + } + case "snapshotedge" | "se" => + kvs.flatMap { kv => s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer) - .fromKeyValues(kvs, None).map(_.toEdge.asInstanceOf[GraphElement]) - case "Vertex" => - if (!Bytes.equals(cf, SKeyValue.VertexCf)) - throw new IllegalArgumentException(s"$elementType is provided by user, but actual column family differ as v") - - s2.defaultStorage.serDe.vertexDeserializer(schemaVer) - .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]) - case _ => throw new IllegalArgumentException(s"$elementType is not supported column family.") - } -// if (Bytes.equals(cf, SKeyValue.VertexCf)) { -// s2.defaultStorage.serDe.vertexDeserializer(schemaVer).fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]) -// } else if (Bytes.equals(cf, SKeyValue.EdgeCf)) { -// val indexEdgeOpt = s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(kvs, None) -// if (indexEdgeOpt.isDefined) indexEdgeOpt.map(_.asInstanceOf[GraphElement]) -// else { -// //TODO: Current version use same column family for snapshotEdge and indexEdge. -// -// val snapshotEdgeOpt = s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(kvs, None) -//// if (snapshotEdgeOpt.isDefined) snapshotEdgeOpt.map(_.toEdge.asInstanceOf[GraphElement]) -//// else throw new IllegalStateException(s"column family indicate this is edge, but neither snapshot/index edge.") -// None -// } -// } else throw new IllegalStateException(s"wrong column family. ${Bytes.toString(cf)}") + .fromKeyValues(Seq(kv), None).map(_.asInstanceOf[GraphElement]) + } + case _ => throw new IllegalArgumentException(s"$elementType is not supported.") } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala index 5465517..963a7d8 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala @@ -23,7 +23,7 @@ import org.apache.s2graph.core.{GraphElement, S2Graph} import org.apache.s2graph.s2jobs.serde.GraphElementReadable class TsvBulkFormatReader extends GraphElementReadable[String] { - override def read(graph: S2Graph)(data: String): Option[GraphElement] = { - graph.elementBuilder.toGraphElement(data) + override def read(graph: S2Graph)(data: String): Seq[GraphElement] = { + graph.elementBuilder.toGraphElement(data).toSeq } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala deleted file mode 100644 index b6cbbb3..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.s2jobs.serde.writer - -import org.apache.s2graph.core.{GraphElement, S2Graph} -import org.apache.s2graph.s2jobs.DegreeKey -import org.apache.s2graph.s2jobs.serde.GraphElementWritable - -object GraphElementDataFrameWriter { - type GraphElementTuple = (Long, String, String, String, String, String, String, String) - val Fields = Seq("timestamp", "operation", "element", "from", "to", "label", "props", "direction") -} - -class GraphElementDataFrameWriter extends GraphElementWritable[GraphElementDataFrameWriter.GraphElementTuple] { - import GraphElementDataFrameWriter._ - private def toGraphElementTuple(tokens: Array[String]): GraphElementTuple = { - tokens match { - case Array(ts, op, elem, from, to, label, props, dir) => (ts.toLong, op, elem, from, to, label, props, dir) - case Array(ts, op, elem, from, to, label, props) => (ts.toLong, op, elem, from, to, label, props, "out") - case _ => throw new IllegalStateException(s"${tokens.toList} is malformed.") - } - } - override def write(s2: S2Graph)(element: GraphElement): GraphElementTuple = { - toGraphElementTuple(element.toLogString().split("\t")) - } - - override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): GraphElementTuple = { - val element = DegreeKey.toEdge(s2, degreeKey, count) - - toGraphElementTuple(element.toLogString().split("\t")) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala new file mode 100644 index 0000000..c2c305c --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2EdgeDataFrameWriter.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde.writer + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.DegreeKey +import org.apache.s2graph.s2jobs.serde.GraphElementWritable +import org.apache.s2graph.s2jobs.serde.writer.S2EdgeDataFrameWriter.S2EdgeTuple + +object S2EdgeDataFrameWriter { + type S2EdgeTuple = (Long, String, String, String, String, String, String, String) + val Fields = Seq("timestamp", "operation", "elem", "from", "to", "label", "props", "direction") +} + +class S2EdgeDataFrameWriter extends GraphElementWritable[S2EdgeTuple] { + import S2EdgeDataFrameWriter._ + private def toGraphElementTuple(tokens: Array[String]): S2EdgeTuple = { + tokens match { + case Array(ts, op, elem, from, to, label, props, dir) => (ts.toLong, op, elem, from, to, label, props, dir) + case Array(ts, op, elem, from, to, label, props) => (ts.toLong, op, elem, from, to, label, props, "out") + case _ => throw new IllegalStateException(s"${tokens.toList} is malformed.") + } + } + override def write(s2: S2Graph)(element: GraphElement): S2EdgeTuple = { + toGraphElementTuple(element.toLogString().split("\t")) + } + + override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): S2EdgeTuple = { + val element = DegreeKey.toEdge(s2, degreeKey, count) + + toGraphElementTuple(element.toLogString().split("\t")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala new file mode 100644 index 0000000..c37f78e --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/S2VertexDataFrameWriter.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde.writer + +import org.apache.s2graph.core.{GraphElement, S2Graph, S2VertexLike} +import org.apache.s2graph.s2jobs.DegreeKey +import org.apache.s2graph.s2jobs.serde.GraphElementWritable +import org.apache.s2graph.s2jobs.serde.writer.S2VertexDataFrameWriter.S2VertexTuple + +object S2VertexDataFrameWriter { + type S2VertexTuple = (Long, String, String, String, String, String, String) + val EmptyS2VertexTuple = (0L, "", "", "", "", "", "") + val Fields = Seq("timestamp", "operation", "elem", "id", "service", "column", "props") +} + +class S2VertexDataFrameWriter extends GraphElementWritable[S2VertexTuple] { + import S2VertexDataFrameWriter._ + private def toVertexTuple(tokens: Array[String]): S2VertexTuple = { + tokens match { + case Array(ts, op, elem, id, service, column, props) => (ts.toLong, op, elem, id, service, column, props) + case _ => throw new IllegalStateException(s"${tokens.toList} is malformed.") + } + } + override def write(s2: S2Graph)(element: GraphElement): S2VertexTuple = { + element match { + case v: S2VertexLike => toVertexTuple(v.toLogString().split("\t")) + case _ => throw new IllegalArgumentException(s"Vertex expected, $element is not vertex.") + } + + } + + override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): S2VertexTuple = + EmptyS2VertexTuple +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index 7b52415..06a28c8 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.s2jobs.task import org.apache.s2graph.core.{GraphElement, Management} import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer} import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader -import org.apache.s2graph.s2jobs.serde.writer.GraphElementDataFrameWriter +import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, S2VertexDataFrameWriter} import org.apache.spark.sql.{DataFrame, DataFrameWriter, SparkSession} @@ -122,7 +122,9 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) { val columnFamily = conf.options.getOrElse("hbase.table.cf", "e") val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt val labelMapping = Map.empty[String, String] - val buildDegree = conf.options.getOrElse("build.degree", "false").toBoolean + val buildDegree = + if (columnFamily == "v") false + else conf.options.getOrElse("build.degree", "false").toBoolean val elementType = conf.options.getOrElse("element.type", "IndexEdge") val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath, @@ -130,11 +132,21 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) { implicit val reader = new S2GraphCellReader(elementType) - implicit val writer = new GraphElementDataFrameWriter - val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree) - val kvs = transformer.transform(cells) + columnFamily match { + case "v" => + implicit val writer = new S2VertexDataFrameWriter + val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree) + val kvs = transformer.transform(cells) - kvs.toDF(GraphElementDataFrameWriter.Fields: _*) + kvs.toDF(S2VertexDataFrameWriter.Fields: _*) + case "e" => + implicit val writer = new S2EdgeDataFrameWriter + val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree) + val kvs = transformer.transform(cells) + + kvs.toDF(S2EdgeDataFrameWriter.Fields: _*) + case _ => throw new IllegalArgumentException(s"$columnFamily is not supported.") + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/31b51929/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala index 6e712a6..9cd52eb 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala @@ -20,16 +20,28 @@ package org.apache.s2graph.s2jobs.task import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} -import org.apache.s2graph.core.S2EdgeLike +import org.apache.s2graph.core.{GraphUtil, S2EdgeLike, S2VertexLike} import org.apache.s2graph.core.storage.hbase.{AsynchbaseStorage, AsynchbaseStorageManagement} import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.{S2EdgeDataFrameWriter, S2VertexDataFrameWriter} import org.apache.s2graph.s2jobs.{BaseSparkTest, S2GraphHelper} import org.apache.spark.sql.DataFrame import scala.collection.JavaConverters._ class SourceTest extends BaseSparkTest { - def toDataFrame(edges: Seq[String]): DataFrame = { + //TODO: props to valid string. + def s2VertexToDataFrame(vertices: Seq[String]): DataFrame = { + import spark.sqlContext.implicits._ + val elements = vertices.flatMap(s2.elementBuilder.toVertex(_)) + + elements.map { v => + (v.ts, GraphUtil.fromOp(v.op), + "v", v.innerId.toIdString(), v.serviceName, v.columnName, "{}") + }.toDF(S2VertexDataFrameWriter.Fields: _*) + } + + def s2EdgeToDataFrame(edges: Seq[String]): DataFrame = { import spark.sqlContext.implicits._ val elements = edges.flatMap(s2.elementBuilder.toEdge(_)) @@ -42,21 +54,24 @@ class SourceTest extends BaseSparkTest { e.label(), "{}", e.getDirection()) - }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction") + }.toDF(S2EdgeDataFrameWriter.Fields: _*) } - - test("S2GraphSource toDF") { + test("S2GraphSource edge toDF") { val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) val snapshotTableName = options.tableName + "-snapshot" // 1. run S2GraphSink to build(not actually load by using LoadIncrementalLoad) bulk load file. - val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" - val df = toDataFrame(Seq(bulkEdgeString)) + val bulkEdges = Seq( + "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}", + "1416236400000\tinsert\tedge\ta\tc\tfriends\t{\"since\":1316236400000,\"score\":10}" + ) + val df = s2EdgeToDataFrame(bulkEdges) val reader = new RowBulkFormatReader val inputEdges = df.collect().flatMap(reader.read(s2)(_)) + .sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString()) val args = options.toCommand.grouped(2).map { kv => kv.head -> kv.last @@ -92,10 +107,69 @@ class SourceTest extends BaseSparkTest { val source = new S2GraphSource(dumpConf) val realDF = source.toDF(spark) val outputEdges = realDF.collect().flatMap(reader.read(s2)(_)) + .sortBy(_.asInstanceOf[S2EdgeLike].tgtVertex.innerId.toIdString()) inputEdges.foreach { e => println(s"[Input]: $e")} outputEdges.foreach { e => println(s"[Output]: $e")} inputEdges shouldBe outputEdges } + + test("S2GraphSource vertex toDF") { + val column = initTestVertexSchema(s2) + val snapshotTableName = options.tableName + "-snapshot" + + val bulkVertices = Seq( + s"1416236400000\tinsert\tvertex\tc\t${column.service.serviceName}\t${column.columnName}\t{}", + s"1416236400000\tinsert\tvertex\td\t${column.service.serviceName}\t${column.columnName}\t{}" + ) + val df = s2VertexToDataFrame(bulkVertices) + + val reader = new RowBulkFormatReader + + val input = df.collect().flatMap(reader.read(s2)(_)) + .sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString()) + + val args = options.toCommand.grouped(2).map { kv => + kv.head -> kv.last + }.toMap ++ Map("writeMethod" -> "bulk", "runLoadIncrementalHFiles" -> "true") + + val conf = TaskConf("test", "sql", Seq("input"), args) + + val sink = new S2GraphSink("testQuery", conf) + sink.write(df) + + // 2. create snapshot if snapshot is not exist to test TableSnapshotInputFormat. + s2.defaultStorage.management.asInstanceOf[AsynchbaseStorageManagement].withAdmin(s2.config) { admin => + import scala.collection.JavaConverters._ + if (admin.listSnapshots(snapshotTableName).asScala.toSet(snapshotTableName)) + admin.deleteSnapshot(snapshotTableName) + + admin.snapshot(snapshotTableName, TableName.valueOf(options.tableName)) + } + + // 3. Decode S2GraphSource to parse HFile + val metaAndHBaseArgs = options.toConfigParams + val hbaseConfig = HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration) + + val dumpArgs = Map( + "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"), + "restore.path" -> "/tmp/hbase_restore", + "hbase.table.names" -> s"${snapshotTableName}", + "hbase.table.cf" -> "v", + "element.type" -> "Vertex" + ) ++ metaAndHBaseArgs + + val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs) + val source = new S2GraphSource(dumpConf) + val realDF = source.toDF(spark) + + val output = realDF.collect().flatMap(reader.read(s2)(_)) + .sortBy(_.asInstanceOf[S2VertexLike].innerId.toIdString()) + + input.foreach { e => println(s"[Input]: $e")} + output.foreach { e => println(s"[Output]: $e")} + + input shouldBe output + } }
