Repository: incubator-s2graph Updated Branches: refs/heads/master 5b22148c6 -> 3332f6bc1
abstract read/write of user provided class. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a52adab0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a52adab0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a52adab0 Branch: refs/heads/master Commit: a52adab0ad01c0d2e60b484cc07ca112a68e98dc Parents: ddfd10d Author: DO YUNG YOON <[email protected]> Authored: Fri Mar 16 15:47:28 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Mar 16 15:51:59 2018 +0900 ---------------------------------------------------------------------- .../scala/org/apache/s2graph/core/S2Graph.scala | 2 + .../s2graph/core/S2VertexPropertyHelper.scala | 19 ++ .../apache/s2graph/core/storage/SKeyValue.scala | 5 +- .../hbase/AsynchbaseStorageManagement.scala | 2 +- .../org/apache/s2graph/s2jobs/DegreeKey.scala | 52 ++++ .../apache/s2graph/s2jobs/S2GraphHelper.scala | 50 ++++ .../s2jobs/loader/GraphFileGenerator.scala | 8 +- .../s2graph/s2jobs/loader/HFileGenerator.scala | 126 +-------- .../s2jobs/loader/HFileMRGenerator.scala | 34 +-- .../loader/LocalBulkLoaderTransformer.scala | 61 ++++ .../s2jobs/loader/RawFileGenerator.scala | 28 +- .../loader/SparkBulkLoaderTransformer.scala | 76 +++++ .../s2jobs/serde/GraphElementReadable.scala | 26 ++ .../s2jobs/serde/GraphElementWritable.scala | 26 ++ .../s2graph/s2jobs/serde/Transformer.scala | 50 ++++ .../serde/reader/TsvBulkFormatReader.scala | 29 ++ .../s2jobs/serde/writer/KeyValueWriter.scala | 33 +++ .../apache/s2graph/s2jobs/BaseSparkTest.scala | 145 ++++++++++ .../s2jobs/dump/GraphFileDumperTest.scala | 97 +++++++ .../s2jobs/loader/GraphFileGeneratorTest.scala | 277 +++++++------------ 20 files changed, 801 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index c3d3887..09f9c7c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -176,6 +176,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override val config = _config.withFallback(S2Graph.DefaultConfig) + val storageBackend = Try { config.getString("s2graph.storage.backend") }.getOrElse("hbase") + Model.apply(config) Model.loadCache() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala index bed69ef..bdb3c00 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.s2graph.core import org.apache.s2graph.core.mysqls.ColumnMeta http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala index 57adc8a..20ca5e6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala @@ -25,8 +25,9 @@ import org.hbase.async.KeyValue object SKeyValue { - val EdgeCf = "e".getBytes("UTF-8") - val VertexCf = "v".getBytes("UTF-8") + val SnapshotEdgeCf = "s".getBytes(StandardCharsets.UTF_8) + val EdgeCf = "e".getBytes(StandardCharsets.UTF_8) + val VertexCf = "v".getBytes(StandardCharsets.UTF_8) val Put = 1 val Delete = 2 val Increment = 3 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala index 8475ba6..f504c75 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala @@ -250,7 +250,7 @@ class AsynchbaseStorageManagement(val config: Config, val clients: Seq[HBaseClie conn.getAdmin } - private def withAdmin(config: Config)(op: Admin => Unit): Unit = { + def withAdmin(config: Config)(op: Admin => Unit): Unit = { val admin = getAdmin(config) try { op(admin) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala new file mode 100644 index 0000000..f1efac3 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs + +import org.apache.s2graph.core.{GraphElement, S2Edge, S2Graph, S2Vertex} +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.hadoop.hbase.{KeyValue => HKeyValue} + + +object DegreeKey { + def fromGraphElement(s2: S2Graph, + element: GraphElement, + labelMapping: Map[String, String] = Map.empty): Option[DegreeKey] = { + element match { + case v: S2Vertex => None + case e: S2Edge => + val newLabel = labelMapping.getOrElse(e.innerLabel.label, e.innerLabel.label) + val degreeKey = DegreeKey(e.srcVertex.innerIdVal.toString, newLabel, e.getDirection()) + Option(degreeKey) + case _ => None + } + } + + def toSKeyValue(s2: S2Graph, + degreeKey: DegreeKey, + count: Long): Seq[SKeyValue] = { + S2GraphHelper.buildDegreePutRequests(s2, degreeKey.vertexIdStr, degreeKey.labelName, degreeKey.direction, count) + } + + def toKeyValue(s2: S2Graph, degreeKey: DegreeKey, count: Long): Seq[HKeyValue] = { + toSKeyValue(s2, degreeKey, count).map(skv => new HKeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)) + } +} + +case class DegreeKey(vertexIdStr: String, labelName: String, direction: String) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index ef76608..3f80e8f 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -21,6 +21,10 @@ package org.apache.s2graph.s2jobs import com.typesafe.config.Config import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} +import play.api.libs.json.Json import scala.concurrent.ExecutionContext @@ -28,4 +32,50 @@ object S2GraphHelper { def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = { new S2Graph(config) } + + def buildDegreePutRequests(s2: S2Graph, + vertexId: String, + labelName: String, + direction: String, + degreeVal: Long): Seq[SKeyValue] = { + val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) + val dir = GraphUtil.directions(direction) + val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { + throw new RuntimeException(s"$vertexId can not be converted into innerval") + } + val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal)) + + val ts = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) + val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs) + + edge.edgesWithIndex.flatMap { indexEdge => + s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues + } + } + + private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = { + val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) + + val snapshotEdgeKeyValues = s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues + val indexEdgeKeyValues = relEdges.flatMap { edge => + edge.edgesWithIndex.flatMap { indexEdge => + s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues + } + } + + snapshotEdgeKeyValues ++ indexEdgeKeyValues + } + + def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean = false): Seq[SKeyValue] = { + if (element.isInstanceOf[S2Edge]) { + val edge = element.asInstanceOf[S2Edge] + insertBulkForLoaderAsync(s2, edge, autoEdgeCreate) + } else if (element.isInstanceOf[S2Vertex]) { + val vertex = element.asInstanceOf[S2Vertex] + s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues + } else { + Nil + } + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala index 79eca36..5f1b940 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileGenerator.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,9 +32,11 @@ object GraphFileGenerator { conf.setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) + val input = sc.textFile(options.input) + options.method match { - case "MR" => HFileMRGenerator.generate(sc, s2Config, input, options) + // case "MR" => HFileMRGenerator.generate(sc, s2Config, input, options) case "SPARK" => HFileGenerator.generate(sc, s2Config, input, options) case _ => throw new IllegalArgumentException("only supported type is MR/SPARK.") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index acd3886..b4ac51f 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,102 +28,19 @@ import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName} -import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement -import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} -import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.s2jobs.S2GraphHelper -import org.apache.s2graph.s2jobs.spark._ +import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.hbase.async.PutRequest -import play.api.libs.json.Json -object HFileGenerator extends RawFileGenerator { +object HFileGenerator extends RawFileGenerator[String, KeyValue] { import scala.collection.JavaConverters._ - private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): List[PutRequest] = { - val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) - - buildPutRequests(s2, edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e => - e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(s2, indexEdge) } - } - } - - def buildPutRequests(s2: S2Graph, snapshotEdge: SnapshotEdge): List[PutRequest] = { - val kvs = s2.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList - kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } - } - - def buildPutRequests(s2: S2Graph, indexEdge: IndexEdge): List[PutRequest] = { - val kvs = s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList - kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } - } - - def buildDegreePutRequests(s2: S2Graph, vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = { - val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) - val dir = GraphUtil.directions(direction) - val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { - throw new RuntimeException(s"$vertexId can not be converted into innerval") - } - val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal)) - - val ts = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs) - - edge.edgesWithIndex.flatMap { indexEdge => - s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => - new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp) - } - } - } - - def toKeyValues(s2: S2Graph, degreeKeyVals: Seq[(DegreeKey, Long)]): Iterator[KeyValue] = { - val kvs = for { - (key, value) <- degreeKeyVals - putRequest <- buildDegreePutRequests(s2, key.vertexIdStr, key.labelName, key.direction, value) - } yield { - val p = putRequest - val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) - kv - } - kvs.toIterator - } - - def toKeyValues(s2: S2Graph, strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { - val kvList = new java.util.ArrayList[KeyValue] - for (s <- strs) { - val elementList = s2.elementBuilder.toGraphElement(s, labelMapping).toSeq - for (element <- elementList) { - if (element.isInstanceOf[S2Edge]) { - val edge = element.asInstanceOf[S2Edge] - val putRequestList = insertBulkForLoaderAsync(s2, edge, autoEdgeCreate) - for (p <- putRequestList) { - val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) - kvList.add(kv) - } - } else if (element.isInstanceOf[S2Vertex]) { - val vertex = element.asInstanceOf[S2Vertex] - val putRequestList = s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues.map { kv => - new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) - } - for (p <- putRequestList) { - val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) - kvList.add(kv) - } - } - } - } - kvList.iterator().asScala - } - - def getTableStartKeys(hbaseConfig: Configuration, tableName: TableName): Array[Array[Byte]] = { val conn = ConnectionFactory.createConnection(hbaseConfig) val regionLocator = conn.getRegionLocator(tableName) + regionLocator.getStartKeys } @@ -132,7 +49,6 @@ object HFileGenerator extends RawFileGenerator { hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum) hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName) -// hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}") hbaseConf } @@ -158,34 +74,12 @@ object HFileGenerator extends RawFileGenerator { } } - def transfer(sc: SparkContext, - s2Config: Config, - input: RDD[String], - graphFileOptions: GraphFileOptions): RDD[KeyValue] = { - val kvs = input.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(s2Config) - - val s = toKeyValues(s2, iter.toSeq, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate) - s - } - - if (!graphFileOptions.buildDegree) kvs - else { - kvs ++ buildDegrees(input, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) => - agg + current - }.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(s2Config) - - toKeyValues(s2, iter.toSeq) - } - } - } - def generateHFile(sc: SparkContext, s2Config: Config, kvs: RDD[KeyValue], options: GraphFileOptions): Unit = { val hbaseConfig = toHBaseConfig(options) + val startKeys = if (options.incrementalLoad) { // need hbase connection to existing table to figure out the ranges of regions. @@ -207,6 +101,7 @@ object HFileGenerator extends RawFileGenerator { val compressionAlgorithmClass = Algorithm.valueOf(options.compressionAlgorithm).getName.toUpperCase val familyOptions = new FamilyHFileWriteOptions(compressionAlgorithmClass, BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) + val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) @@ -217,7 +112,10 @@ object HFileGenerator extends RawFileGenerator { config: Config, rdd: RDD[String], _options: GraphFileOptions): Unit = { - val kvs = transfer(sc, config, rdd, _options) - generateHFile(sc, config, kvs, _options) + val transformer = new SparkBulkLoaderTransformer(config, _options) + val kvs = transformer.transform(rdd).flatMap(kvs => kvs) + + HFileGenerator.generateHFile(sc, config, kvs, _options) } } + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala index ee4c338..3502bee 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,13 +34,10 @@ import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat} import org.apache.hadoop.mapreduce.{Job, Mapper} -import org.apache.s2graph.core.GraphExceptions.LabelNotExistException -import org.apache.s2graph.core.mysqls.Label -import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -object HFileMRGenerator extends RawFileGenerator { +object HFileMRGenerator extends RawFileGenerator[String, KeyValue] { val DefaultBlockSize = 32768 val DefaultConfig = Map( "yarn.app.mapreduce.am.resource.mb" -> 4096, @@ -69,25 +66,7 @@ object HFileMRGenerator extends RawFileGenerator { } def getStartKeys(numRegions: Int): Seq[ImmutableBytesWritable] = { - val startKey = AsynchbaseStorageManagement.getStartKey(numRegions) - val endKey = AsynchbaseStorageManagement.getEndKey(numRegions) - if (numRegions < 3) { - throw new IllegalArgumentException("Must create at least three regions") - } - else if (Bytes.compareTo(startKey, endKey) >= 0) { - throw new IllegalArgumentException("Start key must be smaller than end key") - } - val empty = new Array[Byte](0) - val results = if (numRegions == 3) { - Seq(empty, startKey, endKey) - } else { - val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, numRegions - 3) - if (splitKeys == null || splitKeys.length != numRegions - 1) { - throw new IllegalArgumentException("Unable to split key range into enough regions") - } - Seq(empty) ++ splitKeys.toSeq - } - results.map(new ImmutableBytesWritable(_)) + HFileGenerator.getStartKeys(numRegions).map(new ImmutableBytesWritable(_)) } def sortKeyValues(hbaseConf: Configuration, @@ -124,8 +103,9 @@ object HFileMRGenerator extends RawFileGenerator { def transfer(sc: SparkContext, s2Config: Config, input: RDD[String], - graphFileOptions: GraphFileOptions): RDD[KeyValue] = { - HFileGenerator.transfer(sc, s2Config, input, graphFileOptions) + options: GraphFileOptions): RDD[KeyValue] = { + val transformer = new SparkBulkLoaderTransformer(s2Config, options) + transformer.transform(input).flatMap(kvs => kvs) } override def generate(sc: SparkContext, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala new file mode 100644 index 0000000..7d405a6 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.KeyValue +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.serde.Transformer +import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} + +import scala.concurrent.ExecutionContext + +class LocalBulkLoaderTransformer(val config: Config, + val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] { + val s2: S2Graph = S2GraphHelper.initS2Graph(config) + + override val reader = new TsvBulkFormatReader + override val writer = new KeyValueWriter + + override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_)) + + override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_)) + + override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = { + val degrees = elements.flatMap { element => + DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + }.groupBy(_._1).mapValues(_.map(_._2).sum) + + degrees.toSeq.map { case (degreeKey, count) => + DegreeKey.toKeyValue(s2, degreeKey, count) + } + } + + override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = { + val elements = read(input) + val kvs = write(elements) + + val degrees = if (options.buildDegree) buildDegrees(elements) else Nil + + kvs ++ degrees + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala index 1613f20..ef1aaf6 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala @@ -20,32 +20,12 @@ package org.apache.s2graph.s2jobs.loader import com.typesafe.config.Config -import org.apache.s2graph.core.GraphUtil import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -case class DegreeKey(vertexIdStr: String, labelName: String, direction: String) - -trait RawFileGenerator { +trait RawFileGenerator[S, T] { def generate(sc: SparkContext, - config:Config, - rdd: RDD[String], - _options:GraphFileOptions) - - def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], edgeAutoCreate: Boolean) = { - for { - msg <- msgs - tokens = GraphUtil.split(msg) - if tokens(2) == "e" || tokens(2) == "edge" - tempDirection = if (tokens.length == 7) "out" else tokens(7) - direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection - reverseDirection = if (direction == "out") "in" else "out" - convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5)) - (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4)) - degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction) - degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, reverseDirection) - extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil - output <- List(degreeKey -> 1L) ++ extra - } yield output - } + config: Config, + rdd: RDD[S], + _options: GraphFileOptions): Unit } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala new file mode 100644 index 0000000..cd991e1 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.{KeyValue => HKeyValue} +import org.apache.s2graph.core.GraphElement +import org.apache.s2graph.s2jobs.serde.Transformer +import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} +import org.apache.spark.rdd.RDD + +class SparkBulkLoaderTransformer(val config: Config, + val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] { + val reader = new TsvBulkFormatReader + + val writer = new KeyValueWriter + + override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap { line => + reader.read(s2)(line) + } + } + + override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map(writer.write(s2)(_)) + } + + override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = { + val degrees = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap { element => + DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + } + }.reduceByKey(_ + _) + + degrees.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map { case (degreeKey, count) => + DegreeKey.toKeyValue(s2, degreeKey, count) + } + } + } + + override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = { + val elements = read(input) + val kvs = write(elements) + + if (options.buildDegree) kvs ++ buildDegrees(elements) + kvs + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala new file mode 100644 index 0000000..0544a84 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementReadable.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde + +import org.apache.s2graph.core.{GraphElement, S2Graph} + +trait GraphElementReadable[S] extends Serializable { + def read(graph: S2Graph)(data: S): Option[GraphElement] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala new file mode 100644 index 0000000..ae082d8 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/GraphElementWritable.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde + +import org.apache.s2graph.core.{GraphElement, S2Graph} + +trait GraphElementWritable[T] extends Serializable { + def write(s2: S2Graph)(element: GraphElement): T +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala new file mode 100644 index 0000000..3902c63 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/Transformer.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde + +import com.typesafe.config.Config +import org.apache.s2graph.core.GraphElement +import org.apache.s2graph.s2jobs.loader.GraphFileOptions + +/** + * Define serialize/deserialize. + * Source -> GraphElement + * GraphElement -> Target + * + * @tparam S : Source class. ex) String, RDF.Statement, ... + * @tparam T : Target class. ex) KeyValue, Array[Byte], String, ... + * @tparam M : Container type. ex) RDD, Seq, List, ... + */ +trait Transformer[S, T, M[_]] extends Serializable { + val config: Config + val options: GraphFileOptions + + val reader: GraphElementReadable[S] + + val writer: GraphElementWritable[T] + + def read(input: M[S]): M[GraphElement] + + def write(elements: M[GraphElement]): M[T] + + def buildDegrees(elements: M[GraphElement]): M[T] + + def transform(input: M[S]): M[T] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala new file mode 100644 index 0000000..5465517 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/TsvBulkFormatReader.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde.reader + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.serde.GraphElementReadable + +class TsvBulkFormatReader extends GraphElementReadable[String] { + override def read(graph: S2Graph)(data: String): Option[GraphElement] = { + graph.elementBuilder.toGraphElement(data) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala new file mode 100644 index 0000000..02034af --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde.writer + +import org.apache.hadoop.hbase.KeyValue +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.serde.GraphElementWritable + +class KeyValueWriter(autoEdgeCreate: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] { + override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = { + S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv => + new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala new file mode 100644 index 0000000..78000d4 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs + +import java.io.{File, PrintWriter} + +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} +import org.apache.s2graph.core.{Management, S2Graph} +import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.s2jobs.loader.GraphFileOptions +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +import scala.util.Try + +class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll { + private val master = "local[2]" + private val appName = "example-spark" + + protected var sc: SparkContext = _ + protected val options = GraphFileOptions( + input = "/tmp/test.txt", + tempDir = "/tmp/bulkload_tmp", + output = "/tmp/s2graph_bulkload", + zkQuorum = "localhost", + dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", + dbUser = "sa", + dbPassword = "sa", + dbDriver = "org.h2.Driver", + tableName = "s2graph", + maxHFilePerRegionServer = 1, + numRegions = 3, + compressionAlgorithm = "NONE", + buildDegree = false, + autoEdgeCreate = false) + + protected val s2Config = Management.toConfig(options.toConfigParams) + + protected val tableName = options.tableName + protected val schemaVersion = HBaseType.DEFAULT_VERSION + protected val compressionAlgorithm: String = options.compressionAlgorithm + protected var s2: S2Graph = _ + + private val testLines = Seq( + "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + ) + + override def beforeAll(): Unit = { + // initialize spark context. + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + sc = new SparkContext(conf) + + s2 = S2GraphHelper.initS2Graph(s2Config) + initTestDataFile + } + + override def afterAll(): Unit = { + if (sc != null) sc.stop() + if (s2 != null) s2.shutdown() + } + + def initTestDataFile: Unit = { + deleteRecursively(new File(options.input)) + writeToFile(options.input)(testLines) + } + + def initTestEdgeSchema(s2: S2Graph, tableName: String, + schemaVersion: String = HBaseType.DEFAULT_VERSION, + compressionAlgorithm: String = "none"): Label = { + import scala.collection.JavaConverters._ + /* initialize model for test */ + val management = s2.management + + val service = management.createService(serviceName = "s2graph", cluster = "localhost", + hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") + + val serviceColumn = management.createServiceColumn(service.serviceName, "user", "string", Nil) + + Try { + management.createLabel("friends", serviceColumn, serviceColumn, isDirected = true, + serviceName = service.serviceName, indices = new java.util.ArrayList[Index], + props = Seq(Prop("since", "0", "long"), Prop("score", "0", "integer")).asJava, consistencyLevel = "strong", hTableName = tableName, + hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = compressionAlgorithm, options = "") + } + + Label.findByName("friends").getOrElse(throw new IllegalArgumentException("friends label is not initialized.")) + } + + def initTestVertexSchema(s2: S2Graph): ServiceColumn = { + import scala.collection.JavaConverters._ + /* initialize model for test */ + val management = s2.management + + val service = management.createService(serviceName = "device_profile", cluster = "localhost", + hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") + + management.createServiceColumn(service.serviceName, "imei", "string", + Seq( + Prop(name = "first_time", defaultValue = "''", dataType = "string"), + Prop(name = "last_time", defaultValue = "''", dataType = "string"), + Prop(name = "total_active_days", defaultValue = "-1", dataType = "integer"), + Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"), + Prop(name = "active_months", defaultValue = "-1", dataType = "integer"), + Prop(name = "fua", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_province", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_city", defaultValue = "''", dataType = "string"), + Prop(name = "location_often_days", defaultValue = "-1", dataType = "integer"), + Prop(name = "location_last_province", defaultValue = "''", dataType = "string"), + Prop(name = "location_last_city", defaultValue = "''", dataType = "string"), + Prop(name = "fimei_legality", defaultValue = "-1", dataType = "integer") + )) + } + + def writeToFile(fileName: String)(lines: Seq[String]): Unit = { + val writer = new PrintWriter(fileName) + lines.foreach(line => writer.write(line + "\n")) + writer.close + } + + def deleteRecursively(file: File): Unit = { + if (file.isDirectory) file.listFiles.foreach(deleteRecursively) + if (file.exists && !file.delete) throw new Exception(s"Unable to delete ${file.getAbsolutePath}") + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala new file mode 100644 index 0000000..81566f9 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/dump/GraphFileDumperTest.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//package org.apache.s2graph.s2jobs.dump +// +//import org.apache.s2graph.core._ +//import org.apache.s2graph.core.types.HBaseType +//import org.apache.s2graph.s2jobs.S2GraphHelper +//import org.apache.s2graph.s2jobs.loader.GraphFileOptions +//import org.apache.spark.{SparkConf, SparkContext} +//import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +//import play.api.libs.json.Json +// +//class GraphFileDumperTest extends FunSuite with Matchers with BeforeAndAfterAll { +// private val master = "local[2]" +// private val appName = "example-spark" +// +// private var sc: SparkContext = _ +// val options = GraphFileOptions( +// input = "/tmp/imei-20.txt", +// tempDir = "/tmp/bulkload_tmp", +// output = "/tmp/s2graph_bulkload", +// zkQuorum = "localhost", +// dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", +// dbUser = "sa", +// dbPassword = "sa", +// dbDriver = "org.h2.Driver", +// tableName = "s2graph", +// maxHFilePerRegionServer = 1, +// numRegions = 3, +// compressionAlgorithm = "NONE", +// buildDegree = false, +// autoEdgeCreate = false) +// +// val s2Config = Management.toConfig(options.toConfigParams) +// +// val tableName = options.tableName +// val schemaVersion = HBaseType.DEFAULT_VERSION +// val compressionAlgorithm: String = options.compressionAlgorithm +// var s2: S2Graph = _ +// +// override def beforeAll(): Unit = { +// // initialize spark context. +// val conf = new SparkConf() +// .setMaster(master) +// .setAppName(appName) +// +// sc = new SparkContext(conf) +// +// s2 = S2GraphHelper.initS2Graph(s2Config) +// } +// +// override def afterAll(): Unit = { +// if (sc != null) sc.stop() +// if (s2 != null) s2.shutdown() +// } +// +// test("test dump.") { +// implicit val graph = s2 +// val snapshotPath = "/usr/local/var/hbase" +// val restorePath = "/tmp/hbase_restore" +// val snapshotTableNames = Seq("s2graph-snapshot") +// +// val cellLs = HFileDumper.toKeyValue(sc, snapshotPath, restorePath, +// snapshotTableNames, columnFamily = "v") +// +// val kvsLs = cellLs.map(CanGraphElement.cellsToSKeyValues).collect() +// +// val elements = kvsLs.flatMap { kvs => +// CanGraphElement.sKeyValueToGraphElement(s2)(kvs) +// } +// +// elements.foreach { element => +// val v = element.asInstanceOf[S2VertexLike] +// val json = Json.prettyPrint(PostProcess.s2VertexToJson(v).get) +// +// println(json) +// } +// +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a52adab0/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index 32c5a25..3fbbd88 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -19,145 +19,75 @@ package org.apache.s2graph.s2jobs.loader -import java.io.{File, PrintWriter} -import java.util - import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles import org.apache.hadoop.util.ToolRunner -import org.apache.s2graph.core.{Management, PostProcess, S2Graph, S2VertexLike} -import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} -import org.apache.s2graph.core.storage.CanSKeyValue -import org.apache.s2graph.core.types.HBaseType -import org.apache.s2graph.s2jobs.S2GraphHelper -import org.apache.spark.{SparkConf, SparkContext} -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import org.apache.s2graph.core.{PostProcess, S2VertexLike} +import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue} +import org.apache.s2graph.s2jobs.BaseSparkTest import play.api.libs.json.Json import scala.io.Source -import scala.util.Try - -object GraphFileGeneratorTest { - def initTestEdgeSchema(s2: S2Graph, tableName: String, - schemaVersion: String = HBaseType.DEFAULT_VERSION, - compressionAlgorithm: String = "none"): Label = { - import scala.collection.JavaConverters._ - /* initialize model for test */ - val management = s2.management - - val service = management.createService(serviceName = "s2graph", cluster = "localhost", - hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") - - val serviceColumn = management.createServiceColumn(service.serviceName, "user", "string", Nil) - - Try { - management.createLabel("friends", serviceColumn, serviceColumn, isDirected = true, - serviceName = service.serviceName, indices = new java.util.ArrayList[Index], - props = Seq(Prop("since", "0", "long"), Prop("score", "0", "integer")).asJava, consistencyLevel = "strong", hTableName = tableName, - hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = compressionAlgorithm, options = "") - } - Label.findByName("friends").getOrElse(throw new IllegalArgumentException("friends label is not initialized.")) +class GraphFileGeneratorTest extends BaseSparkTest { + import scala.concurrent.ExecutionContext.Implicits.global + import org.apache.hadoop.hbase.{KeyValue => HKeyValue} + + def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = { + transformerMode match { + case "spark" => + val input = sc.parallelize(edges) + val transformer = new SparkBulkLoaderTransformer(s2Config, options) + val kvs = transformer.transform(input) + kvs.flatMap { kvs => + kvs.map { kv => + CanSKeyValue.hbaseKeyValue.toSKeyValue(kv) + } + }.collect().toList + + case "local" => + val input = edges + val transformer = new LocalBulkLoaderTransformer(s2Config, options) + val kvs = transformer.transform(input) + kvs.flatMap { kvs => + kvs.map { kv => + CanSKeyValue.hbaseKeyValue.toSKeyValue(kv) + } + }.toList + } } + test("test generateKeyValues edge only. SparkBulkLoaderTransformer") { + val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) + /* end of initialize model */ - def initTestVertexSchema(s2: S2Graph): ServiceColumn = { - import scala.collection.JavaConverters._ - /* initialize model for test */ - val management = s2.management - - val service = management.createService(serviceName = "device_profile", cluster = "localhost", - hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") - - management.createServiceColumn(service.serviceName, "imei", "string", - Seq( - Prop(name = "first_time", defaultValue = "''", dataType = "string"), - Prop(name = "last_time", defaultValue = "''", dataType = "string"), - Prop(name = "total_active_days", defaultValue = "-1", dataType = "integer"), - Prop(name = "query_amount", defaultValue = "-1", dataType = "integer"), - Prop(name = "active_months", defaultValue = "-1", dataType = "integer"), - Prop(name = "fua", defaultValue = "''", dataType = "string"), - Prop(name = "location_often_province", defaultValue = "''", dataType = "string"), - Prop(name = "location_often_city", defaultValue = "''", dataType = "string"), - Prop(name = "location_often_days", defaultValue = "-1", dataType = "integer"), - Prop(name = "location_last_province", defaultValue = "''", dataType = "string"), - Prop(name = "location_last_city", defaultValue = "''", dataType = "string"), - Prop(name = "fimei_legality", defaultValue = "-1", dataType = "integer") - )) - } + val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" - def writeToFile(fileName: String)(lines: Seq[String]): Unit = { - val writer = new PrintWriter(fileName) - lines.foreach(line => writer.write(line + "\n")) - writer.close - } + val transformerMode = "spark" + val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString)) - def deleteRecursively(file: File): Unit = { - if (file.isDirectory) file.listFiles.foreach(deleteRecursively) - if (file.exists && !file.delete) throw new Exception(s"Unable to delete ${file.getAbsolutePath}") - } -} + val serDe = s2.defaultStorage.serDe -class GraphFileGeneratorTest extends FunSuite with Matchers with BeforeAndAfterAll { - import GraphFileGeneratorTest._ - import scala.collection.JavaConverters._ - - private val master = "local[2]" - private val appName = "example-spark" - - private var sc: SparkContext = _ - val options = GraphFileOptions( - input = "/tmp/imei-20.txt", - tempDir = "/tmp/bulkload_tmp", - output = "/tmp/s2graph_bulkload", - zkQuorum = "localhost", - dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", - dbUser = "sa", - dbPassword = "sa", - dbDriver = "org.h2.Driver", - tableName = "s2graph", - maxHFilePerRegionServer = 1, - numRegions = 3, - compressionAlgorithm = "NONE", - buildDegree = false, - autoEdgeCreate = false) - - val s2Config = Management.toConfig(options.toConfigParams) - - val tableName = options.tableName - val schemaVersion = HBaseType.DEFAULT_VERSION - val compressionAlgorithm: String = options.compressionAlgorithm - var s2: S2Graph = _ - - override def beforeAll(): Unit = { - // initialize spark context. - val conf = new SparkConf() - .setMaster(master) - .setAppName(appName) - - sc = new SparkContext(conf) - - s2 = S2GraphHelper.initS2Graph(s2Config) - } + val bulkEdge = s2.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get - override def afterAll(): Unit = { - if (sc != null) sc.stop() - if (s2 != null) s2.shutdown() - } + val indexEdges = ls.flatMap { kv => + serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None) + } + + val indexEdge = indexEdges.head - test("test generateKeyValues edge only.") { - import scala.collection.JavaConverters._ - import org.apache.s2graph.core.storage.CanSKeyValue._ + println(indexEdge) + println(bulkEdge) + bulkEdge shouldBe (indexEdge) + } + test("test generateKeyValues edge only. LocalBulkLoaderTransformer") { val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) /* end of initialize model */ val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" - val input = sc.parallelize(Seq(bulkEdgeString)) - - val kvs = HFileGenerator.transfer(sc, s2Config, input, options) - val ls = kvs.map(kv => CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList + val transformerMode = "local" + val ls = transformToSKeyValues(transformerMode, Seq(bulkEdgeString)) val serDe = s2.defaultStorage.serDe @@ -172,20 +102,16 @@ class GraphFileGeneratorTest extends FunSuite with Matchers with BeforeAndAfterA println(indexEdge) println(bulkEdge) - bulkEdge shouldBe(indexEdge) + bulkEdge shouldBe (indexEdge) } - - test("test generateKeyValues vertex only.") { + test("test generateKeyValues vertex only. SparkBulkLoaderTransformer") { val serviceColumn = initTestVertexSchema(s2) val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get - val input = sc.parallelize(Seq(bulkVertexString)) - - val kvs = HFileGenerator.transfer(sc, s2Config, input, options) - - val ls = kvs.map(kv => CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList + val transformerMode = "spark" + val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) val serDe = s2.defaultStorage.serDe @@ -198,59 +124,62 @@ class GraphFileGeneratorTest extends FunSuite with Matchers with BeforeAndAfterA bulkVertex shouldBe(vertex) } - test("test generateHFile vertex only.") { - val serviceColumn = initTestVertexSchema(s2) -// val lines = Source.fromFile("/tmp/imei-20.txt").getLines().toSeq - val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\ts2graph\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" - val lines = Seq(bulkVertexString) - val input = sc.parallelize(lines) - - val kvs = HFileGenerator.transfer(sc, s2Config, input, options) - println(kvs.count()) - } - - // this test case expect options.input already exist with valid bulk load format. - test("bulk load and fetch vertex: spark mode") { + test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") { val serviceColumn = initTestVertexSchema(s2) + val bulkVertexString = "20171201\tinsert\tvertex\t800188448586078\tdevice_profile\timei\t{\"first_time\":\"20171025\",\"last_time\":\"20171112\",\"total_active_days\":14,\"query_amount\":1526.0,\"active_months\":2,\"fua\":\"M5+Note\",\"location_often_province\":\"广ä¸ç\",\"location_often_city\":\"æ·±å³å¸\",\"location_often_days\":6,\"location_last_province\":\"广ä¸ç\",\"location_last_city\":\"æ·±å³å¸\",\"fimei_legality\":3}" + val bulkVertex = s2.elementBuilder.toGraphElement(bulkVertexString, options.labelMapping).get - deleteRecursively(new File(options.tempDir)) - deleteRecursively(new File(options.output)) - - val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq - val input = sc.parallelize(bulkVertexLs) - - HFileGenerator.generate(sc, s2Config, input, options) + val transformerMode = "local" + val ls = transformToSKeyValues(transformerMode, Seq(bulkVertexString)) - val hfileArgs = Array(options.output, options.tableName) - val hbaseConfig = HBaseConfiguration.create() + val serDe = s2.defaultStorage.serDe - val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + val vertex = serDe.vertexDeserializer(serviceColumn.schemaVersion).fromKeyValues(ls, None).get - val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) - val json = PostProcess.verticesToJson(s2Vertices) + PostProcess.s2VertexToJson(vertex).foreach { jsValue => + println(Json.prettyPrint(jsValue)) + } - println(Json.prettyPrint(json)) + bulkVertex shouldBe(vertex) } - // this test case expect options.input already exist with valid bulk load format. - test("bulk load and fetch vertex: mr mode") { - val serviceColumn = initTestVertexSchema(s2) - - deleteRecursively(new File(options.tempDir)) - deleteRecursively(new File(options.output)) - - val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq - val input = sc.parallelize(bulkVertexLs) - - HFileMRGenerator.generate(sc, s2Config, input, options) - - val hfileArgs = Array(options.output, options.tableName) - val hbaseConfig = HBaseConfiguration.create() - - val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) - val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) - val json = PostProcess.verticesToJson(s2Vertices) - - println(Json.prettyPrint(json)) - } +// this test case expect options.input already exist with valid bulk load format. +// test("bulk load and fetch vertex: spark mode") { +// import scala.collection.JavaConverters._ +// val serviceColumn = initTestVertexSchema(s2) +// +// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq +// val input = sc.parallelize(bulkVertexLs) +// +// HFileGenerator.generate(sc, s2Config, input, options) +// +// val hfileArgs = Array(options.output, options.tableName) +// val hbaseConfig = HBaseConfiguration.create() +// +// val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) +// +// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) +// val json = PostProcess.verticesToJson(s2Vertices) +// +// println(Json.prettyPrint(json)) +// } + +// this test case expect options.input already exist with valid bulk load format. +// test("bulk load and fetch vertex: mr mode") { +// val serviceColumn = initTestVertexSchema(s2) +// +// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq +// val input = sc.parallelize(bulkVertexLs) +// +// HFileMRGenerator.generate(sc, s2Config, input, options) +// +// val hfileArgs = Array(options.output, options.tableName) +// val hbaseConfig = HBaseConfiguration.create() +// +// val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) +// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) +// val json = PostProcess.verticesToJson(s2Vertices) +// +// println(Json.prettyPrint(json)) +// } }
