Repository: incubator-s2graph Updated Branches: refs/heads/master ce95fd8fc -> b7fc8085c
- fix warnings from comments. - fix warnings from tuple. - fix type parameter name collision with tinkerpop.T. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/843cdbc9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/843cdbc9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/843cdbc9 Branch: refs/heads/master Commit: 843cdbc966c6a88b5c187a9f1e3237385753706c Parents: ea734a9 Author: DO YUNG YOON <steams...@apache.org> Authored: Fri Aug 11 19:35:11 2017 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Fri Aug 11 22:26:01 2017 +0900 ---------------------------------------------------------------------- .../apache/s2graph/core/ExceptionHandler.scala | 2 +- .../org/apache/s2graph/core/JSONParser.scala | 11 ++-- .../org/apache/s2graph/core/Management.scala | 6 +- .../org/apache/s2graph/core/QueryResult.scala | 4 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 10 +-- .../scala/org/apache/s2graph/core/S2Graph.scala | 67 ++++++++++---------- .../apache/s2graph/core/S2GraphFactory.scala | 2 +- .../org/apache/s2graph/core/S2Property.scala | 1 - .../org/apache/s2graph/core/mysqls/Bucket.scala | 2 +- .../org/apache/s2graph/core/mysqls/Label.scala | 4 +- .../s2graph/core/rest/RequestParser.scala | 5 +- .../apache/s2graph/core/storage/SKeyValue.scala | 2 +- .../apache/s2graph/core/storage/Storage.scala | 32 +++++----- .../core/storage/hbase/AsynchbaseStorage.scala | 12 ++-- .../tall/IndexEdgeDeserializable.scala | 6 +- .../wide/IndexEdgeDeserializable.scala | 6 +- .../org/apache/s2graph/rest/netty/Server.scala | 1 + .../apache/s2graph/rest/play/Bootstrap.scala | 2 +- .../rest/play/controllers/AdminController.scala | 2 +- .../rest/play/controllers/EdgeController.scala | 4 +- 20 files changed, 90 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala index 48976d3..eb5b1da 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala @@ -96,7 +96,7 @@ object ExceptionHandler { private def toKafkaProp(config: Config) = { val props = new Properties() - /** all default configuration for new producer */ + /* all default configuration for new producer */ val brokers = if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list") else "localhost" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala index ea50b17..9de3d9d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala @@ -302,13 +302,12 @@ object JSONParser { def jsValueToAny(value: JsValue): Option[AnyRef] = { try { - val v = value match { -// case JsNull => - case n: JsNumber => n.value - case s: JsString => TemplateHelper.replaceVariable(System.currentTimeMillis(), s.value) - case b: JsBoolean => Boolean.box(b.value) + value match { + case n: JsNumber => Option(n.value) + case s: JsString => Option(TemplateHelper.replaceVariable(System.currentTimeMillis(), s.value)) + case b: JsBoolean => Option(Boolean.box(b.value)) + case _ => None } - Option(v) } catch { case e: Exception => logger.error(s"jsValueToAny: $value", e) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 6119045..a9741d2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -299,7 +299,7 @@ class Management(graph: S2Graph) { Model withTx { implicit session => val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false) - /** create hbase table for service */ + /* create hbase table for service */ graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) service } @@ -380,14 +380,14 @@ class Management(graph: S2Graph) { Model withTx { implicit session => if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.") - /** create all models */ + /* create all models */ val newLabel = Label.insertAll(label, srcServiceName, srcColumnName, srcColumnType, tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName, indices, props, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options) - /** create hbase table */ + /* create hbase table */ val storage = graph.getStorage(newLabel) val service = newLabel.service storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index a7f485c..3916f39 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -290,7 +290,7 @@ object StepResult { (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1) aggregated = agg(groupByKey) if aggregated.nonEmpty sorted = orderBy(globalQueryOption, aggregated) - } yield groupByKey -> (scoreSum, sorted) + } yield (groupByKey, (scoreSum, sorted)) (Nil, grouped) } else { val ordered = orderBy(globalQueryOption, ls) @@ -319,7 +319,7 @@ object StepResult { (key, (scoreSum, values)) <- baseStepResult.grouped (out, in) = values.partition(v => filterOutSet.contains(v.filterOutValues)) newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => prev + current.score } if in.nonEmpty - } yield key -> (newScoreSum, in) + } yield (key, (newScoreSum, in)) StepResult(edgeWithScores = filteredResults, grouped = grouped, baseStepResult.degreeEdges, cursors = baseStepResult.cursors, failCount = baseStepResult.failCount + filterOutStepResult.failCount) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index fa9ff62..7165579 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -142,7 +142,7 @@ case class SnapshotEdge(graph: S2Graph, override def toString(): String = { Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction, "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, - "statusCode" -> statusCode, "lockTs" -> lockTs).toString + "statusCode" -> statusCode, "lockTs" -> lockTs).toString() } } @@ -206,7 +206,7 @@ case class IndexEdge(graph: S2Graph, propsWithTs.get(meta.name) match { case null => - /** + /* * TODO: agly hack * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once */ @@ -297,7 +297,7 @@ case class IndexEdge(graph: S2Graph, override def toString(): String = { Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir, "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString - ).toString + ).toString() } } @@ -925,12 +925,12 @@ object S2Edge { for { (requestEdge, func) <- requestWithFuncs } { - val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer) + val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)) prevPropsWithTs = _newPropsWithTs // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") } val requestTs = requestEdge.ts - /** version should be monotoniously increasing so our RPC mutation should be applied safely */ + /* version should be monotoniously increasing so our RPC mutation should be applied safely */ val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs) val maxTs = prevPropsWithTs.map(_._2.ts).max val newTs = if (maxTs > requestTs) maxTs else requestTs http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 890dc4d..d1eda5e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -22,6 +22,7 @@ package org.apache.s2graph.core import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.{Executors, TimeUnit} +import java.util.function.Consumer import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} @@ -239,8 +240,8 @@ object S2Graph { tsVal } - def processDuplicates[T](queryParam: QueryParam, - duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = { + def processDuplicates[R](queryParam: QueryParam, + duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = { if (queryParam.label.consistencyLevel != "strong") { //TODO: @@ -309,7 +310,7 @@ object S2Graph { edgeWithScore } - /** process step group by */ + /* process step group by */ val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount) @@ -319,25 +320,25 @@ object S2Graph { val score = edgeWithScore.score val label = edgeWithScore.label - /** Select */ + /* Select */ val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) // val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) - /** OrderBy */ + /* OrderBy */ val orderByValues = if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) - /** StepGroupBy */ + /* StepGroupBy */ val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys) - /** GroupBy */ + /* GroupBy */ val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys) - /** FilterOut */ + /* FilterOut */ val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields) newEdgeWithScore.copy(orderByValues = orderByValues, @@ -346,13 +347,13 @@ object S2Graph { filterOutValues = filterOutValues) } - /** process step group by */ + /* process step group by */ val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) - /** process ordered list */ + /* process ordered list */ val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil - /** process grouped list */ + /* process grouped list */ val grouped = if (queryOption.groupBy.keys.isEmpty) Nil else { @@ -365,13 +366,13 @@ object S2Graph { val newScoreSum = scoreSum - /** + /* * watch out here. by calling toString on Any, we lose type information which will be used * later for toJson. */ if (merged.nonEmpty) { val newKey = merged.head.groupByValues - agg += (newKey -> (newScoreSum, merged)) + agg += ((newKey, (newScoreSum, merged))) } } agg.toSeq.sortBy(_._2._1 * -1) @@ -399,7 +400,7 @@ object S2Graph { val score = edgeWithScore.score val label = edgeWithScore.label - /** Select */ + /* Select */ val mergedPropsWithTs = if (queryOption.selectColumns.isEmpty) { edge.propertyValuesInner() @@ -450,17 +451,17 @@ object S2Graph { } } - private def buildResult[T](query: Query, + private def buildResult[R](query: Query, stepIdx: Int, stepResultLs: Seq[(QueryRequest, StepResult)], parentEdges: Map[VertexId, Seq[EdgeWithScore]]) - (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T) - (implicit ev: WithScore[T]): ListBuffer[T] = { + (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R) + (implicit ev: WithScore[R]): ListBuffer[R] = { import scala.collection._ - val results = ListBuffer.empty[T] - val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty - val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty + val results = ListBuffer.empty[R] + val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty + val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty @@ -488,7 +489,7 @@ object S2Graph { val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false) // params += (hashKey -> queryParam) // - /** check if this edge should be exlcuded. */ + /* check if this edge should be exlcuded. */ if (shouldBeExcluded) { edgesToExclude.add(filterHashKey) } else { @@ -500,7 +501,7 @@ object S2Graph { sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam)) duplicates.get(hashKey) match { case None => - val newLs = ListBuffer.empty[(FilterHashKey, T)] + val newLs = ListBuffer.empty[(FilterHashKey, R)] newLs += (filterHashKey -> newEdgeWithScore) duplicates += (hashKey -> newLs) // case Some(old) => @@ -547,7 +548,7 @@ object S2Graph { new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD) )) @Graph.OptOuts(value = Array( - /** Process */ + /* Process */ /* branch: passed all. */ // new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.branch.BranchTest$Traversals", method = "*", reason = "no"), // passed: all @@ -769,7 +770,7 @@ object S2Graph { // new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest", method = "*", reason = "no"), // passed: all - /** Structure */ + /* Structure */ new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateIdEquality", reason="reference equals on EdgeId is not supported."), new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method="shouldValidateEquality", reason="reference equals on EdgeId is not supported."), // passed: all, failed: none @@ -967,7 +968,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def flushStorage(): Unit = { storagePool.foreach { case (_, storage) => - /** flush is blocking */ + /* flush is blocking */ storage.flush() } } @@ -1149,7 +1150,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = { - /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache + /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache * so use empty cacheKey. * */ val queryParam = QueryParam(labelName = edge.innerLabel.label, @@ -1194,7 +1195,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val requestTs = ts val vertices = srcVertices - /** create query per label */ + /* create query per label */ val queries = for { label <- labels } yield { @@ -1252,7 +1253,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val ret = label.schemaVersion match { case HBaseType.VERSION3 | HBaseType.VERSION4 => if (label.consistencyLevel == "strong") { - /** + /* * read: snapshotEdge on queryResult = O(N) * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) */ @@ -1262,7 +1263,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } case _ => - /** + /* * read: x * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) */ @@ -1369,7 +1370,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) - /** multiple edges with weak consistency level will be processed as batch */ + /* multiple edges with weak consistency level will be processed as batch */ val mutations = edges.flatMap { edge => val (_, edgeUpdate) = if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) @@ -1450,7 +1451,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val parts = GraphUtil.split(s) val logType = parts(2) val element = if (logType == "edge" | logType == "e") { - /** current only edge is considered to be bulk loaded */ + /* current only edge is considered to be bulk loaded */ labelMapping.get(parts(5)) match { case None => case Some(toReplace) => @@ -1754,7 +1755,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph case s2Edge: S2Edge => s2Edge.id().asInstanceOf[EdgeId] case id: EdgeId => id case s: String => EdgeId.fromString(s) - case s: java.lang.String => EdgeId.fromString(s) } val edgesToFetch = for { id <- s2EdgeIds @@ -1937,8 +1937,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph override def toString(): String = "[s2graph]" override def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = { - builder.graph(this).registry(new S2GraphIoRegistry).create().asInstanceOf[I] - + builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I] } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala index 7f13711..a9c98c2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala @@ -119,7 +119,7 @@ object S2GraphFactory { true, S2Graph.DefaultServiceName, Nil, Seq(Prop("weight", "0.0", "double")), "strong", None, None) } - def cleanupDefaultSchema: Unit = { + def cleanupDefaultSchema(): Unit = { val columnNames = Set(S2Graph.DefaultColumnName, "person", "software", "product", "dog", "animal", "song", "artist", "STEPHEN") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala index 6b0c0eb..e86c17f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala @@ -49,7 +49,6 @@ object S2Property { case _: Boolean => true case _: Short => true case _: Byte => true - case _: String => true case _: BigDecimal => true case _ => false } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala index 43e5db8..e71bbce 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala @@ -51,7 +51,7 @@ object Bucket extends Model[Bucket] { def toRange(str: String): Option[(Int, Int)] = { val range = str.split(rangeDelimiter) - if (range.length == 2) Option(range.head.toInt, range.last.toInt) + if (range.length == 2) Option((range.head.toInt, range.last.toInt)) else None } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala index 415a64e..97fd704 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala @@ -189,14 +189,14 @@ object Label extends Model[Label] { val tgtServiceId = tgtService.id.get val serviceId = service.id.get - /** insert serviceColumn */ + /* insert serviceColumn */ val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType)) val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType)) if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}") if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}") - /** create label */ + /* create label */ Label.findByName(labelName, useCache = false).getOrElse { val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index a2f5c47..4bc9376 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -259,6 +259,7 @@ class RequestParser(graph: S2Graph) { case arr: JsArray => val keys = arr.asOpt[Seq[String]].getOrElse(Nil) GroupBy(keys) + case _ => GroupBy.Empty }.getOrElse(GroupBy.Empty) def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[S2Vertex] = { @@ -514,7 +515,7 @@ class RequestParser(graph: S2Graph) { private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = { (js \ key).validate[R] match { case JsError(errors) => - val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption) + val msg = (JsError.toJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption) val e = Json.obj("args" -> key, "error" -> msg) throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString) case JsSuccess(result, _) => result @@ -524,7 +525,7 @@ class RequestParser(graph: S2Graph) { private def parseOption[R](js: JsValue, key: String)(implicit read: Reads[R]): Option[R] = { (js \ key).validateOpt[R] match { case JsError(errors) => - val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption) + val msg = (JsError.toJson(errors) \ "obj").as[List[JsValue]].flatMap(x => (x \ "msg").toOption) val e = Json.obj("args" -> key, "error" -> msg) throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString) case JsSuccess(result, _) => result http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala index bde7f3a..db9a9da 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala @@ -43,7 +43,7 @@ case class SKeyValue(table: Array[Byte], def toLogString = { Map("table" -> Bytes.toString(table), "row" -> row.toList, "cf" -> Bytes.toString(cf), "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp, - "operation" -> operation, "durability" -> durability).toString + "operation" -> operation, "durability" -> durability).toString() } override def toString(): String = toLogString http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index a8dec7e..57d4872 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -398,7 +398,7 @@ abstract class Storage[Q, R](val graph: S2Graph, future recoverWith { case FetchTimeoutException(retryEdge) => logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") - /** fetch failed. re-fetch should be done */ + /* fetch failed. re-fetch should be done */ fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } @@ -413,14 +413,14 @@ abstract class Storage[Q, R](val graph: S2Graph, } logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") - /** retry logic */ + /* retry logic */ val promise = Promise[Boolean] val backOff = exponentialBackOff(tryNum) scheduledThreadPool.schedule(new Runnable { override def run(): Unit = { val future = if (failedStatusCode == 0) { // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge. - /** fetch failed. re-fetch should be done */ + /* fetch failed. re-fetch should be done */ fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } @@ -453,7 +453,7 @@ abstract class Storage[Q, R](val graph: S2Graph, case 0 => fetchedSnapshotEdgeOpt match { case None => - /** + /* * no one has never mutated this SN. * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges) * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1) @@ -475,7 +475,7 @@ abstract class Storage[Q, R](val graph: S2Graph, case Some(snapshotEdge) => snapshotEdge.pendingEdgeOpt match { case None => - /** + /* * others finished commit on this SN. but there is no contention. * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges) * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ? @@ -497,7 +497,7 @@ abstract class Storage[Q, R](val graph: S2Graph, case Some(pendingEdge) => val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() if (isLockExpired) { - /** + /* * if pendingEdge.ts == snapshotEdge.ts => * (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge)) * else => @@ -519,7 +519,7 @@ abstract class Storage[Q, R](val graph: S2Graph, commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) } else { - /** + /* * others finished commit on this SN and there is currently contention. * this can't be proceed so retry from re-fetch. * throw EX @@ -532,11 +532,11 @@ abstract class Storage[Q, R](val graph: S2Graph, } case _ => - /** + /* * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock */ - /** + /* * this succeed to lock this SN. keep doing on commit process. * if SN.isEmpty => * no one never succed to commit on this SN. @@ -807,7 +807,7 @@ abstract class Storage[Q, R](val graph: S2Graph, buildIncrementsAsync(indexEdge, -1L) } - /** reverted direction */ + /* reverted direction */ val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ @@ -972,7 +972,7 @@ abstract class Storage[Q, R](val graph: S2Graph, tgtVertexIdOpt match { case Some(tgtVertexId) => // _to is given. - /** we use toSnapshotEdge so dont need to swap src, tgt */ + /* we use toSnapshotEdge so dont need to swap src, tgt */ val src = srcVertex.innerId val tgt = tgtVertexId val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src), TargetVertexId(tgtColumn, tgt)) @@ -989,7 +989,7 @@ abstract class Storage[Q, R](val graph: S2Graph, } protected def fetchSnapshotEdgeInner(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = { - /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache + /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache * so use empty cacheKey. * */ val queryParam = QueryParam(labelName = edge.innerLabel.label, @@ -1042,21 +1042,21 @@ abstract class Storage[Q, R](val graph: S2Graph, def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = { (edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match { case (true, true) => - /** when there is no need to update. shouldUpdate == false */ + /* when there is no need to update. shouldUpdate == false */ Nil -> Nil case (true, false) => - /** no edges to delete but there is new edges to insert so increase degree by 1 */ + /* no edges to delete but there is new edges to insert so increase degree by 1 */ val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree) buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_)) case (false, true) => - /** no edges to insert but there is old edges to delete so decrease degree by 1 */ + /* no edges to insert but there is old edges to delete so decrease degree by 1 */ val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree) buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1)) case (false, false) => - /** update on existing edges so no change on degree */ + /* update on existing edges so no change on degree */ Nil -> Nil } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index e4d85cf..4fb2240 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -220,7 +220,7 @@ class AsynchbaseStorage(override val graph: S2Graph, val _client = client(withWait) val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment) - /** Asynchbase IncrementRequest does not implement HasQualifiers */ + /* Asynchbase IncrementRequest does not implement HasQualifiers */ val incrementsFutures = increments.map { kv => val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) val defer = _client.atomicIncrement(inc) @@ -231,7 +231,7 @@ class AsynchbaseStorage(override val graph: S2Graph, if (withWait) future else Future.successful(true) } - /** PutRequest and DeleteRequest accept byte[][] qualifiers/values. */ + /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */ val othersFutures = putAndDeletes.groupBy { kv => (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp) }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) => @@ -362,7 +362,7 @@ class AsynchbaseStorage(override val graph: S2Graph, } (_startKey , Bytes.add(baseKey, intervalMinBytes)) } else { - /** + /* * note: since propsToBytes encode size of property map at first byte, we are sure about max value here */ val _startKey = queryParam.cursorOpt match { @@ -449,7 +449,7 @@ class AsynchbaseStorage(override val graph: S2Graph, val queryParam = queryRequest.queryParam val cacheTTL = queryParam.cacheTTLInMillis - /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ + /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ val edge = toRequestEdge(queryRequest, parentEdges) val request = buildRequest(queryRequest, edge) @@ -562,7 +562,7 @@ class AsynchbaseStorage(override val graph: S2Graph, compressionAlgorithm: String, replicationScopeOpt: Option[Int] = None, totalRegionCount: Option[Int] = None): Unit = { - /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */ + /* TODO: Decide if we will allow each app server to connect to multiple hbase cluster */ for { zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq } { @@ -789,7 +789,7 @@ class AsynchbaseStorage(override val graph: S2Graph, } private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = { - /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ + /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ hbaseRpc match { case Left(getRequest) => getRequest.key case Right(ScanWithRange(scanner, offset, limit)) => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index 6095cea..3da8267 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -111,7 +111,7 @@ class IndexEdgeDeserializable(graph: S2Graph, } val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}")) - /** process indexProps */ + /* process indexProps */ val size = idxPropsRaw.length (0 until size).foreach { ith => val meta = index.sortKeyTypesArray(ith) @@ -125,7 +125,7 @@ class IndexEdgeDeserializable(graph: S2Graph, } } - /** process props */ + /* process props */ if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) @@ -139,7 +139,7 @@ class IndexEdgeDeserializable(graph: S2Graph, } } - /** process tgtVertexId */ + /* process tgtVertexId */ val tgtVertexId = if (edge.checkProperty(LabelMeta.to.name)) { val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 6818c1d..59db07e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -87,7 +87,7 @@ class IndexEdgeDeserializable(graph: S2Graph, val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}")) - /** process indexProps */ + /* process indexProps */ val size = idxPropsRaw.length (0 until size).foreach { ith => val meta = index.sortKeyTypesArray(ith) @@ -101,7 +101,7 @@ class IndexEdgeDeserializable(graph: S2Graph, } } - /** process props */ + /* process props */ if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) @@ -114,7 +114,7 @@ class IndexEdgeDeserializable(graph: S2Graph, edge.propertyInner(k.name, v.value, version) } } - /** process tgtVertexId */ + /* process tgtVertexId */ val tgtVertexId = if (edge.checkProperty(LabelMeta.to.name)) { val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala index a47fda7..126f193 100644 --- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala +++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala @@ -46,6 +46,7 @@ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.io.Source import scala.util.{Failure, Success, Try} +import scala.language.existentials class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] { val ApplicationJson = "application/json" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala index e5fc75d..ff82c44 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala @@ -83,7 +83,7 @@ object Global extends WithFilters(new GzipFilter()) { wallLogHandler.shutdown() QueueActor.shutdown() - /** + /* * shutdown hbase client for flush buffers. */ shutdown() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala index ecea304..8eb25fd 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala @@ -100,7 +100,7 @@ object AdminController extends Controller { case Failure(error) => logger.error(error.getMessage, error) error match { - case JsResultException(e) => bad(JsError.toFlatJson(e)) + case JsResultException(e) => bad(JsError.toJson(e)) case _ => bad(error.getMessage) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/843cdbc9/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index aed8ced..28da7fe 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -129,7 +129,7 @@ object EdgeController extends Controller { }.map(jsonResponse(_)) } else { val rets = elementWithIdxs.map { case ((element, tsv), idx) => - if (!skipElement(element.isAsync)) QueueActor.router ! (element, tsv) + if (!skipElement(element.isAsync)) QueueActor.router ! ((element, tsv)) true } Future.successful(jsonResponse(Json.toJson(rets))) @@ -251,7 +251,7 @@ object EdgeController extends Controller { def deleteAllInner(jsValue: JsValue, withWait: Boolean) = { - /** logging for delete all request */ + /* logging for delete all request */ def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = { val kafkaMessages = for { id <- ids