Merge remote-tracking branch 'apache/master' into S2GRAPH-201
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4e758c46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4e758c46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4e758c46 Branch: refs/heads/master Commit: 4e758c4627187bda7c82fbfb9db8747bffe26448 Parents: 162b460 9dc39ee Author: DO YUNG YOON <[email protected]> Authored: Mon Apr 23 11:44:06 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Apr 23 12:21:50 2018 +0900 ---------------------------------------------------------------------- CHANGES | 7 + build.sbt | 36 ++ dev_support/README.md | 19 +- project/plugins.sbt | 2 + .../scala/org/apache/s2graph/core/S2Edge.scala | 108 +++--- .../tall/SnapshotEdgeDeserializable.scala | 5 +- .../apache/s2graph/graphql/GraphQLServer.scala | 7 +- .../org/apache/s2graph/graphql/HttpServer.scala | 4 +- .../apache/s2graph/graphql/bind/AstHelper.scala | 28 ++ .../s2graph/graphql/bind/Unmarshaller.scala | 127 +++++++ .../s2graph/graphql/marshaller/package.scala | 124 ------- .../graphql/repository/GraphRepository.scala | 62 +++- .../s2graph/graphql/resolver/Resolver.scala | 28 -- .../s2graph/graphql/types/FieldResolver.scala | 96 ++++++ .../s2graph/graphql/types/ManagementType.scala | 312 +++++++++++++++++ .../graphql/types/S2ManagementType.scala | 339 ------------------- .../apache/s2graph/graphql/types/S2Type.scala | 219 ++++++------ .../types/SangriaPlayJsonScalarType.scala | 76 ----- .../s2graph/graphql/types/SchemaDef.scala | 4 +- .../s2graph/graphql/types/StaticType.scala | 149 ++++++++ .../apache/s2graph/graphql/types/package.scala | 122 +------ .../apache/s2graph/graphql/ScenarioTest.scala | 52 +-- .../org/apache/s2graph/graphql/SchemaTest.scala | 13 +- .../org/apache/s2graph/graphql/TestGraph.scala | 19 +- .../apache/s2graph/s2jobs/S2GraphHelper.scala | 12 +- .../s2jobs/loader/GraphFileOptions.scala | 19 +- .../s2graph/s2jobs/loader/HFileGenerator.scala | 8 +- .../s2jobs/loader/HFileMRGenerator.scala | 5 +- .../loader/LocalBulkLoaderTransformer.scala | 7 +- .../loader/SparkBulkLoaderTransformer.scala | 19 +- .../s2graph/s2jobs/serde/Transformer.scala | 1 - .../serde/reader/RowBulkFormatReader.scala | 4 +- .../s2jobs/serde/writer/KeyValueWriter.scala | 19 +- .../serde/writer/RowDataFrameWriter.scala | 19 ++ .../org/apache/s2graph/s2jobs/task/Sink.scala | 4 +- .../spark/sql/streaming/S2SinkContext.scala | 2 + .../sql/streaming/S2StreamQueryWriter.scala | 5 +- .../apache/s2graph/s2jobs/BaseSparkTest.scala | 7 +- .../s2graph/s2jobs/task/TaskConfTest.scala | 19 ++ s2rest_netty/build.sbt | 2 - s2rest_play/build.sbt | 2 - 41 files changed, 1170 insertions(+), 942 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/CHANGES ---------------------------------------------------------------------- diff --cc CHANGES index 077f641,f61461c..f8887f6 --- a/CHANGES +++ b/CHANGES @@@ -35,6 -35,7 +35,8 @@@ Release Notes - S2Graph - Version 0.2. * [S2GRAPH-163] - Update version.sbt after release * [S2GRAPH-180] - Implement missing Management API * [S2GRAPH-197] - Provide S2graphSink for non-streaming dataset + * [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink ++ * [S2GRAPH-201] - Provide S2GraphSource ** Bug * [S2GRAPH-159] - Wrong syntax at a bash script under Linux http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index ade62fa,b65af21..b0b4aed --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@@ -29,11 -31,38 +29,17 @@@ import play.api.libs.json.{JsObject, Js import scala.concurrent.ExecutionContext import scala.util.Try - object S2GraphHelper { - def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = { - new S2Graph(config) + object S2GraphHelper extends Logger { + private var s2Graph:S2Graph = null + + def getS2Graph(config: Config, init:Boolean = false)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = { + if (s2Graph == null || init) { + logger.info(s"S2Graph initialized..") + s2Graph = new S2Graph(config) + } + s2Graph } - def buildDegreePutRequests(s2: S2Graph, - vertexId: String, - labelName: String, - direction: String, - degreeVal: Long): Seq[SKeyValue] = { - val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) - val dir = GraphUtil.directions(direction) - val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { - throw new RuntimeException(s"$vertexId can not be converted into innerval") - } - val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal)) - - val ts = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs) - - edge.edgesWithIndex.flatMap { indexEdge => - s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues - } - } - private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = { val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala index 95847f9,36b585e..bf3d25c --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala @@@ -29,22 -28,14 +28,18 @@@ import org.apache.spark.rdd.RD import scala.reflect.ClassTag class SparkBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions) extends Transformer[RDD] { + val labelMapping: Map[String, String] = Map.empty, + val buildDegree: Boolean = false) extends Transformer[RDD] { + - val GraphElementEncoder = org.apache.spark.sql.Encoders.kryo[GraphElement] - - implicit val encoder = GraphElementEncoder - + def this(config: Config, options: GraphFileOptions) = + this(config, options.labelMapping, options.buildDegree) override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = { val degrees = elements.mapPartitions { iter => - val s2 = S2SinkContext(config).getGraph + val s2 = S2GraphHelper.getS2Graph(config) iter.flatMap { element => - DegreeKey.fromGraphElement(s2, element).map(_ -> 1L) - DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) ++ DegreeKey.fromGraphElement(s2, element, labelMapping).map(_ -> 1L) } }.reduceByKey(_ + _) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/RowBulkFormatReader.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala index 7d2b981,0000000..be23628 mode 100644,000000..100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/RowDataFrameWriter.scala @@@ -1,17 -1,0 +1,36 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ +package org.apache.s2graph.s2jobs.serde.writer + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} +import org.apache.s2graph.s2jobs.serde.GraphElementWritable +import org.apache.spark.sql.Row + +class RowDataFrameWriter extends GraphElementWritable[Row]{ + override def write(s2: S2Graph)(element: GraphElement): Row = { + S2GraphHelper.graphElementToSparkSqlRow(s2, element) + } + + override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): Row = { + val element = DegreeKey.toEdge(s2, degreeKey, count) + S2GraphHelper.graphElementToSparkSqlRow(s2, element) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e758c46/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala ---------------------------------------------------------------------- diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala index 95e2bac,0000000..98b05ac mode 100644,000000..100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/TaskConfTest.scala @@@ -1,42 -1,0 +1,61 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ +package org.apache.s2graph.s2jobs.task + +import org.apache.s2graph.s2jobs.BaseSparkTest +import play.api.libs.json.Json + +class TaskConfTest extends BaseSparkTest { + test("parse dump loader TaskConf") { + val s = + """ + |{ + | "name": "s2graph_sink", + | "inputs": [ + | "filter" + | ], + | "type": "s2graph", + | "options": { + | "writeMethod": "bulk", + | "hbase.zookeeper.quorum": "localhost", + | "db.default.driver": "com.mysql.jdbc.Driver", + | "db.default.url": "jdbc:mysql://localhost:3306/graph_dev", + | "db.default.user": "graph", + | "db.default.password": "graph", + | "--input": "dummy", + | "--tempDir": "dummy", + | "--output": "/tmp/HTableMigrate", + | "--zkQuorum": "localhost", + | "--table": "CopyRated", + | "--dbUrl": "jdbc:mysql://localhost:3306/graph_dev", + | "--dbUser": "graph", + | "--dbPassword": "graph", + | "--dbDriver": "com.mysql.jdbc.Driver", + | "--autoEdgeCreate": "true", + | "--buildDegree": "true" + | } + | } + """.stripMargin + + implicit val TaskConfReader = Json.reads[TaskConf] + val taskConf = Json.parse(s).as[TaskConf] + + } +}
