add tc for S2GraphSource.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a0ce6f8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a0ce6f8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a0ce6f8e Branch: refs/heads/master Commit: a0ce6f8e578d33151651b70593c75d1c99c57cf3 Parents: 30137d4 Author: DO YUNG YOON <[email protected]> Authored: Wed Apr 4 17:47:36 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Wed Apr 4 17:47:36 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/S2EdgeLike.scala | 7 +- .../org/apache/s2graph/core/S2VertexLike.scala | 5 +- .../s2graph/s2jobs/loader/HFileGenerator.scala | 23 +++---- .../loader/SparkBulkLoaderTransformer.scala | 14 +++- .../s2graph/s2jobs/serde/Transformer.scala | 1 - .../s2jobs/serde/reader/IdentityReader.scala | 19 ++++++ .../s2jobs/serde/reader/S2GraphCellReader.scala | 67 ++++++++++++++++---- .../s2jobs/serde/writer/DataFrameWriter.scala | 25 -------- .../writer/GraphElementDataFrameWriter.scala | 49 ++++++++++++++ .../s2jobs/serde/writer/IdentityWriter.scala | 19 ++++++ .../org/apache/s2graph/s2jobs/task/Source.scala | 29 +++++++-- .../org/apache/s2graph/s2jobs/task/Task.scala | 9 +++ .../s2graph/s2jobs/S2GraphHelperTest.scala | 2 +- .../s2jobs/loader/GraphFileGeneratorTest.scala | 2 +- .../apache/s2graph/s2jobs/task/SourceTest.scala | 52 ++++++++++++--- 15 files changed, 245 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala index f2ea4ad..f581e52 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala @@ -250,6 +250,11 @@ trait S2EdgeLike extends Edge with GraphElement { def toLogString: String = { // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t") + val propsWithName = for { + (k, v) <- propsWithTs.asScala.toMap + jsValue <- JSONParser.anyValToJsValue(v.innerVal.value) + } yield (v.labelMeta.name -> jsValue) + + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, Json.toJson(propsWithName)).mkString("\t") } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala index 2fbc4f1..eb01da3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala @@ -61,8 +61,9 @@ trait S2VertexLike extends Vertex with GraphElement { def toLogString(): String = { val propsWithName = for { - (k, v) <- props.asScala - } yield (v.columnMeta.name -> v.value.toString) + (k, v) <- props.asScala.toMap + jsValue <- JSONParser.anyValToJsValue(v.innerVal.value) + } yield (v.columnMeta.name -> jsValue) val (serviceName, columnName) = if (!id.storeColId) ("", "") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index ee6b842..c105448 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -33,11 +33,11 @@ import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.util.ToolRunner -import org.apache.s2graph.core.Management +import org.apache.s2graph.core.GraphElement import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement import org.apache.s2graph.s2jobs.S2GraphHelper import org.apache.s2graph.s2jobs.serde.reader.{S2GraphCellReader, TsvBulkFormatReader} -import org.apache.s2graph.s2jobs.serde.writer.{DataFrameWriter, KeyValueWriter} +import org.apache.s2graph.s2jobs.serde.writer.{GraphElementDataFrameWriter, IdentityWriter, KeyValueWriter} import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -141,24 +141,25 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { def tableSnapshotDump(ss: SparkSession, config: Config, - options: GraphFileOptions, snapshotPath: String, restorePath: String, tableNames: Seq[String], columnFamily: String = "e", - batchSize: Int = 1000): DataFrame = { - import ss.sqlContext.implicits._ - + elementType: String = "IndexEdge", + batchSize: Int = 1000, + labelMapping: Map[String, String] = Map.empty, + buildDegree: Boolean = false): RDD[Seq[Cell]] = { val cf = Bytes.toBytes(columnFamily) val hbaseConfig = HBaseConfiguration.create(ss.sparkContext.hadoopConfiguration) hbaseConfig.set("hbase.rootdir", snapshotPath) val initial = ss.sparkContext.parallelize(Seq.empty[Seq[Cell]]) - val input = tableNames.foldLeft(initial) { case (prev, tableName) => + tableNames.foldLeft(initial) { case (prev, tableName) => val scan = new Scan scan.addFamily(cf) scan.setBatch(batchSize) + scan.setMaxVersions(1) TableSnapshotInputFormatImpl.setInput(hbaseConfig, tableName, new Path(restorePath)) hbaseConfig.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray())) @@ -170,14 +171,6 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { prev ++ current } - - implicit val reader = new S2GraphCellReader - implicit val writer = new DataFrameWriter - - val transformer = new SparkBulkLoaderTransformer(config, options) - val kvs = transformer.transform(input) - - kvs.toDF(DataFrameWriter.Fields: _*) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala index 03d9784..5f8d3e5 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala @@ -28,14 +28,22 @@ import org.apache.spark.rdd.RDD import scala.reflect.ClassTag class SparkBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions) extends Transformer[RDD] { + val labelMapping: Map[String, String] = Map.empty, + val buildDegree: Boolean = false) extends Transformer[RDD] { + + val GraphElementEncoder = org.apache.spark.sql.Encoders.kryo[GraphElement] + + implicit val encoder = GraphElementEncoder + + def this(config: Config, options: GraphFileOptions) = + this(config, options.labelMapping, options.buildDegree) override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = { val degrees = elements.mapPartitions { iter => val s2 = S2GraphHelper.initS2Graph(config) iter.flatMap { element => - DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + DegreeKey.fromGraphElement(s2, element).map(_ -> 1L) } }.reduceByKey(_ + _) @@ -63,7 +71,7 @@ class SparkBulkLoaderTransformer(val config: Config, iter.map(writer.write(s2)(_)) } - if (options.buildDegree) kvs ++ buildDegrees(elements) + if (buildDegree) kvs ++ buildDegrees(elements) else kvs } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala index a448d3f..0b6dcba 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala @@ -34,7 +34,6 @@ import scala.reflect.ClassTag */ trait Transformer[M[_]] extends Serializable { val config: Config -// val options: GraphFileOptions def buildDegrees[T: ClassTag](elements: M[GraphElement])(implicit writer: GraphElementWritable[T]): M[T] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala index b4d1eb2..a4d985b 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.s2graph.s2jobs.serde.reader import org.apache.s2graph.core.{GraphElement, S2Graph} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala index 868f0f2..f5487ab 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.s2graph.s2jobs.serde.reader import org.apache.hadoop.hbase.Cell @@ -7,7 +26,7 @@ import org.apache.s2graph.core.types.HBaseType import org.apache.s2graph.core.{GraphElement, S2Graph} import org.apache.s2graph.s2jobs.serde.GraphElementReadable -class S2GraphCellReader extends GraphElementReadable[Seq[Cell]]{ +class S2GraphCellReader(elementType: String) extends GraphElementReadable[Seq[Cell]]{ override def read(s2: S2Graph)(cells: Seq[Cell]): Option[GraphElement] = { if (cells.isEmpty) None else { @@ -20,18 +39,42 @@ class S2GraphCellReader extends GraphElementReadable[Seq[Cell]]{ new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, cell.getQualifier, cell.getValue, cell.getTimestamp, SKeyValue.Default) } + elementType match { + case "IndexEdge" => + if (!Bytes.equals(cf, SKeyValue.EdgeCf)) + throw new IllegalArgumentException(s"$elementType is provided by user, but actual column family differ as e") + + s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer) + .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]) + case "SnapshotEdge" => + //TODO: replace this to use separate column family: SKeyValue.SnapshotEdgeCF + if (!Bytes.equals(cf, SKeyValue.EdgeCf)) + throw new IllegalArgumentException(s"$elementType is provided by user, but actual column family differ as e") - if (Bytes.equals(cf, SKeyValue.VertexCf)) { - s2.defaultStorage.serDe.vertexDeserializer(schemaVer).fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]) - } else if (Bytes.equals(cf, SKeyValue.EdgeCf)) { - val indexEdgeOpt = s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(kvs, None) - if (indexEdgeOpt.isDefined) indexEdgeOpt.map(_.asInstanceOf[GraphElement]) - else { - val snapshotEdgeOpt = s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(kvs, None) - if (snapshotEdgeOpt.isDefined) snapshotEdgeOpt.map(_.toEdge.asInstanceOf[GraphElement]) - else throw new IllegalStateException(s"column family indicate this is edge, but neither snapshot/index edge.") - } - } else throw new IllegalStateException(s"wrong column family. ${Bytes.toString(cf)}") + s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer) + .fromKeyValues(kvs, None).map(_.toEdge.asInstanceOf[GraphElement]) + case "Vertex" => + if (!Bytes.equals(cf, SKeyValue.VertexCf)) + throw new IllegalArgumentException(s"$elementType is provided by user, but actual column family differ as v") + + s2.defaultStorage.serDe.vertexDeserializer(schemaVer) + .fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]) + case _ => throw new IllegalArgumentException(s"$elementType is not supported column family.") + } +// if (Bytes.equals(cf, SKeyValue.VertexCf)) { +// s2.defaultStorage.serDe.vertexDeserializer(schemaVer).fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]) +// } else if (Bytes.equals(cf, SKeyValue.EdgeCf)) { +// val indexEdgeOpt = s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(kvs, None) +// if (indexEdgeOpt.isDefined) indexEdgeOpt.map(_.asInstanceOf[GraphElement]) +// else { +// //TODO: Current version use same column family for snapshotEdge and indexEdge. +// +// val snapshotEdgeOpt = s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(kvs, None) +//// if (snapshotEdgeOpt.isDefined) snapshotEdgeOpt.map(_.toEdge.asInstanceOf[GraphElement]) +//// else throw new IllegalStateException(s"column family indicate this is edge, but neither snapshot/index edge.") +// None +// } +// } else throw new IllegalStateException(s"wrong column family. ${Bytes.toString(cf)}") } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala deleted file mode 100644 index 458821e..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala +++ /dev/null @@ -1,25 +0,0 @@ -package org.apache.s2graph.s2jobs.serde.writer - -import org.apache.s2graph.core.{GraphElement, S2Graph} -import org.apache.s2graph.s2jobs.DegreeKey -import org.apache.s2graph.s2jobs.serde.GraphElementWritable - -object DataFrameWriter { - type GraphElementTuple = (String, String, String, String, String, String, String, String) - val Fields = Seq("timestamp", "operation", "element", "from", "to", "label", "props", "direction") -} - -class DataFrameWriter extends GraphElementWritable[DataFrameWriter.GraphElementTuple]{ - override def write(s2: S2Graph)(element: GraphElement): (String, String, String, String, String, String, String, String) = { - val Array(ts, op, elem, from, to, label, props, dir) = element.toLogString().split("\t") - - (ts, op, elem, from, to, label, props, dir) - } - - override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): (String, String, String, String, String, String, String, String) = { - val element = DegreeKey.toEdge(s2, degreeKey, count) - val Array(ts, op, elem, from, to, label, props, dir) = element.toLogString().split("\t") - - (ts, op, elem, from, to, label, props, dir) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala new file mode 100644 index 0000000..b6cbbb3 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/GraphElementDataFrameWriter.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde.writer + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.DegreeKey +import org.apache.s2graph.s2jobs.serde.GraphElementWritable + +object GraphElementDataFrameWriter { + type GraphElementTuple = (Long, String, String, String, String, String, String, String) + val Fields = Seq("timestamp", "operation", "element", "from", "to", "label", "props", "direction") +} + +class GraphElementDataFrameWriter extends GraphElementWritable[GraphElementDataFrameWriter.GraphElementTuple] { + import GraphElementDataFrameWriter._ + private def toGraphElementTuple(tokens: Array[String]): GraphElementTuple = { + tokens match { + case Array(ts, op, elem, from, to, label, props, dir) => (ts.toLong, op, elem, from, to, label, props, dir) + case Array(ts, op, elem, from, to, label, props) => (ts.toLong, op, elem, from, to, label, props, "out") + case _ => throw new IllegalStateException(s"${tokens.toList} is malformed.") + } + } + override def write(s2: S2Graph)(element: GraphElement): GraphElementTuple = { + toGraphElementTuple(element.toLogString().split("\t")) + } + + override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): GraphElementTuple = { + val element = DegreeKey.toEdge(s2, degreeKey, count) + + toGraphElementTuple(element.toLogString().split("\t")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala index 32bab6a..9d2656c 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.s2graph.s2jobs.serde.writer import org.apache.s2graph.core.{GraphElement, S2Graph} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index 3d0aefb..7b52415 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -19,10 +19,11 @@ package org.apache.s2graph.s2jobs.task -import org.apache.s2graph.core.Management -import org.apache.s2graph.s2jobs.S2GraphHelper -import org.apache.s2graph.s2jobs.loader.HFileGenerator -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.s2graph.core.{GraphElement, Management} +import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer} +import org.apache.s2graph.s2jobs.serde.reader.S2GraphCellReader +import org.apache.s2graph.s2jobs.serde.writer.GraphElementDataFrameWriter +import org.apache.spark.sql.{DataFrame, DataFrameWriter, SparkSession} /** @@ -111,15 +112,29 @@ class S2GraphSource(conf: TaskConf) extends Source(conf) { override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names") override def toDF(ss: SparkSession): DataFrame = { - val options = TaskConf.toGraphFileOptions(conf) - val config = Management.toConfig(options.toConfigParams) + import ss.sqlContext.implicits._ + val mergedConf = TaskConf.parseHBaseConfigs(conf) ++ TaskConf.parseMetaStoreConfigs(conf) + val config = Management.toConfig(mergedConf) val snapshotPath = conf.options("hbase.rootdir") val restorePath = conf.options("restore.path") val tableNames = conf.options("hbase.table.names").split(",") val columnFamily = conf.options.getOrElse("hbase.table.cf", "e") val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt + val labelMapping = Map.empty[String, String] + val buildDegree = conf.options.getOrElse("build.degree", "false").toBoolean + val elementType = conf.options.getOrElse("element.type", "IndexEdge") - HFileGenerator.tableSnapshotDump(ss, config, options, snapshotPath, restorePath, tableNames, columnFamily, batchSize) + val cells = HFileGenerator.tableSnapshotDump(ss, config, snapshotPath, + restorePath, tableNames, columnFamily, elementType, batchSize, labelMapping, buildDegree) + + + implicit val reader = new S2GraphCellReader(elementType) + implicit val writer = new GraphElementDataFrameWriter + + val transformer = new SparkBulkLoaderTransformer(config, labelMapping, buildDegree) + val kvs = transformer.transform(cells) + + kvs.toDF(GraphElementDataFrameWriter.Fields: _*) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala index ddd56bf..6ba2468 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Task.scala @@ -29,6 +29,15 @@ object TaskConf { GraphFileOptions.toOption(args) } + + def parseHBaseConfigs(taskConf: TaskConf): Map[String, String] = { + taskConf.options.filterKeys(_.startsWith("hbase.")) + } + + def parseMetaStoreConfigs(taskConf: TaskConf): Map[String, String] = { + taskConf.options.filterKeys(_.startsWith("db.")) + } + } case class TaskConf(name:String, `type`:String, inputs:Seq[String] = Nil, options:Map[String, String] = Map.empty) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala index f2b0102..6b21cfc 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala @@ -29,7 +29,7 @@ class S2GraphHelperTest extends BaseSparkTest { println(args) val taskConf = TaskConf("dummy", "sink", Nil, args) - val graphFileOptions = S2GraphHelper.toGraphFileOptions(taskConf) + val graphFileOptions = TaskConf.toGraphFileOptions(taskConf) println(graphFileOptions) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index 991897b..dfdb595 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -190,7 +190,7 @@ class GraphFileGeneratorTest extends BaseSparkTest { val input = sc.parallelize(bulkVertexLs) HFileGenerator.generate(sc, s2Config, input, options) - HFileGenerator.loadIncrementHFile(options) + HFileGenerator.loadIncrementalHFiles(options) val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) val json = PostProcess.verticesToJson(s2Vertices) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a0ce6f8e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala index d2788c2..6e712a6 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala @@ -19,8 +19,11 @@ package org.apache.s2graph.s2jobs.task +import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.s2graph.core.S2EdgeLike -import org.apache.s2graph.s2jobs.BaseSparkTest +import org.apache.s2graph.core.storage.hbase.{AsynchbaseStorage, AsynchbaseStorageManagement} +import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader +import org.apache.s2graph.s2jobs.{BaseSparkTest, S2GraphHelper} import org.apache.spark.sql.DataFrame import scala.collection.JavaConverters._ @@ -42,28 +45,57 @@ class SourceTest extends BaseSparkTest { }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction") } - test("S2graphSink writeBatch.") { + + test("S2GraphSource toDF") { val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) + val snapshotTableName = options.tableName + "-snapshot" + // 1. run S2GraphSink to build(not actually load by using LoadIncrementalLoad) bulk load file. val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" val df = toDataFrame(Seq(bulkEdgeString)) + + val reader = new RowBulkFormatReader + + val inputEdges = df.collect().flatMap(reader.read(s2)(_)) + val args = options.toCommand.grouped(2).map { kv => kv.head -> kv.last - }.toMap + }.toMap ++ Map("writeMethod" -> "bulk", "runLoadIncrementalHFiles" -> "true") val conf = TaskConf("test", "sql", Seq("input"), args) val sink = new S2GraphSink("testQuery", conf) sink.write(df) + // 2. create snapshot if snapshot is not exist to test TableSnapshotInputFormat. + s2.defaultStorage.management.asInstanceOf[AsynchbaseStorageManagement].withAdmin(s2.config) { admin => + import scala.collection.JavaConverters._ + if (admin.listSnapshots(snapshotTableName).asScala.toSet(snapshotTableName)) + admin.deleteSnapshot(snapshotTableName) + + admin.snapshot(snapshotTableName, TableName.valueOf(options.tableName)) + } + + // 3. Decode S2GraphSource to parse HFile + val metaAndHBaseArgs = options.toConfigParams + val hbaseConfig = HBaseConfiguration.create(spark.sparkContext.hadoopConfiguration) + val dumpArgs = Map( - "hbase.rootdir" -> "", - "restore.path" -> "", - "hbase.table.names" -> Seq(options.tableName).mkString(","), - "hbase.table.cf" -> "e" - ) + "hbase.rootdir" -> hbaseConfig.get("hbase.rootdir"), + "restore.path" -> "/tmp/hbase_restore", + "hbase.table.names" -> s"${snapshotTableName}", + "hbase.table.cf" -> "e", + "element.type" -> "IndexEdge" + ) ++ metaAndHBaseArgs + val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs) - val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) - s2Edges.foreach { edge => println(edge) } + val source = new S2GraphSource(dumpConf) + val realDF = source.toDF(spark) + val outputEdges = realDF.collect().flatMap(reader.read(s2)(_)) + + inputEdges.foreach { e => println(s"[Input]: $e")} + outputEdges.foreach { e => println(s"[Output]: $e")} + + inputEdges shouldBe outputEdges } }
