http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala deleted file mode 100644 index f04e342..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala +++ /dev/null @@ -1,104 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.google.common.cache.Cache -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.types.{LabelWithDirection, VertexId} -import com.kakao.s2graph.core.utils.logger -import scala.collection.{Map, Seq} -import scala.concurrent.{Future, ExecutionContext} -import scala.util.Try - -abstract class QueryBuilder[R, T](storage: Storage)(implicit ec: ExecutionContext) { - - def buildRequest(queryRequest: QueryRequest): R - - def getEdge(srcVertex: Vertex, tgtVertex: Vertex, queryParam: QueryParam, isInnerCall: Boolean): T - - def fetch(queryRequest: QueryRequest, - prevStepScore: Double, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): T - - def toCacheKeyBytes(request: R): Array[Byte] - - def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]] - - - def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = { - if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil) - else { - val queryRequest = queryRequestWithResultsLs.head.queryRequest - val q = orgQuery - val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult) - - val stepIdx = queryRequest.stepIdx + 1 - - val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None - val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) - val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) - val step = q.steps(stepIdx) - val alreadyVisited = - if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean] - else Graph.alreadyVisitedVertices(queryResultsLs) - - val groupedBy = queryResultsLs.flatMap { queryResult => - queryResult.edgeWithScoreLs.map { case edgeWithScore => - edgeWithScore.edge.tgtVertex -> edgeWithScore - } - }.groupBy { case (vertex, edgeWithScore) => vertex } - - val groupedByFiltered = for { - (vertex, edgesWithScore) <- groupedBy - aggregatedScore = edgesWithScore.map(_._2.score).sum if aggregatedScore >= prevStepThreshold - } yield vertex -> aggregatedScore - - val prevStepTgtVertexIdEdges = for { - (vertex, edgesWithScore) <- groupedBy - } yield vertex.id -> edgesWithScore.map { case (vertex, edgeWithScore) => edgeWithScore } - - val nextStepSrcVertices = if (prevStepLimit >= 0) { - groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) - } else { - groupedByFiltered.toSeq - } - - val queryRequests = for { - (vertex, prevStepScore) <- nextStepSrcVertices - queryParam <- step.queryParams - } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore) - - Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), alreadyVisited)(ec) - } - } - - def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = { - for { - queryRequestWithResultLs <- queryRequestWithResultLsFuture - ret <- fetchStep(orgQuery, queryRequestWithResultLs) - } yield ret - } - - def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = { - val fallback = { - val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, queryParam = QueryParam.Empty) - Future.successful(q.vertices.map(v => QueryRequestWithResult(queryRequest, QueryResult()))) - } - Try { - if (q.steps.isEmpty) { - // TODO: this should be get vertex query. - fallback - } else { - // current stepIdx = -1 - val startQueryResultLs = QueryResult.fromVertices(q) - q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) => - fetchStepFuture(q, acc) - } - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala index c2c822b..a0aa261 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala @@ -1,11 +1,28 @@ package com.kakao.s2graph.core.storage +import org.apache.hadoop.hbase.util.Bytes import org.hbase.async.KeyValue -case class SKeyValue(table: Array[Byte], row: Array[Byte], cf: Array[Byte], qualifier: Array[Byte], value: Array[Byte], timestamp: Long) { + +object SKeyValue { + val Put = 1 + val Delete = 2 + val Increment = 3 + val Default = Put +} +case class SKeyValue(table: Array[Byte], + row: Array[Byte], + cf: Array[Byte], + qualifier: Array[Byte], + value: Array[Byte], + timestamp: Long, + operation: Int = SKeyValue.Default) { def toLogString = { - Map("table" -> table.toList, "row" -> row.toList, "cf" -> cf.toList, "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp).toString + Map("table" -> table.toList, "row" -> row.toList, "cf" -> Bytes.toString(cf), + "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp, + "operation" -> operation).toString } + override def toString(): String = toLogString } trait CanSKeyValue[T] { @@ -28,4 +45,3 @@ object CanSKeyValue { // For hbase KeyValues } - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala new file mode 100644 index 0000000..bee064b --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala @@ -0,0 +1,10 @@ +package com.kakao.s2graph.core.storage + +import com.kakao.s2graph.core.storage.StorageSerializable + +object Serializable { + val vertexCf = "v".getBytes() + val edgeCf = "e".getBytes() +} + +trait Serializable[E] extends StorageSerializable[E] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala new file mode 100644 index 0000000..d4f55f0 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala @@ -0,0 +1,142 @@ +package com.kakao.s2graph.core.storage + +import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta} +import com.kakao.s2graph.core.storage.{CanSKeyValue, SKeyValue, StorageDeserializable} +import com.kakao.s2graph.core.types._ +import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} +import org.apache.hadoop.hbase.util.Bytes + +class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { + + import StorageDeserializable._ + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { + queryParam.label.schemaVersion match { + case HBaseType.VERSION2 | HBaseType.VERSION1 => fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt) + case _ => fromKeyValuesInnerV3(queryParam, _kvs, version, cacheElementOpt) + } + } + + def statusCodeWithOp(byte: Byte): (Byte, Byte) = { + val statusCode = byte >> 4 + val op = byte & ((1 << 4) - 1) + (statusCode.toByte, op.toByte) + } + + def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + assert(kvs.size == 1) + + val kv = kvs.head + val schemaVer = queryParam.label.schemaVersion + val cellVersion = kv.timestamp + + val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) + }.getOrElse(parseRow(kv, schemaVer)) + + val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = { + val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer) + var pos = 0 + val (statusCode, op) = statusCodeWithOp(kv.value(pos)) + pos += 1 + val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val kvsMap = props.toMap + val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong + + pos = endAt + val _pendingEdgeOpt = + if (pos == kv.value.length) None + else { + val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) + pos += 1 + // val versionNum = Bytes.toLong(kv.value, pos, 8) + // pos += 8 + val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + pos = endAt + val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) + + val pendingEdge = + Edge(Vertex(srcVertexId, cellVersion), + Vertex(tgtVertexId, cellVersion), + labelWithDir, pendingEdgeOp, + cellVersion, pendingEdgeProps.toMap, + statusCode = pendingEdgeStatusCode, lockTs = lockTs) + Option(pendingEdge) + } + + (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt) + } + + SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), + labelWithDir, op, cellVersion, props, statusCode = statusCode, + pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) + } + + private def fromKeyValuesInnerV3[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + assert(kvs.size == 1) + + val kv = kvs.head + val schemaVer = queryParam.label.schemaVersion + val cellVersion = kv.timestamp + /** rowKey */ + def parseRowV3(kv: SKeyValue, version: String) = { + var pos = 0 + val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version) + pos += srcIdAndTgtIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + + val rowLen = srcIdAndTgtIdLen + 4 + 1 + (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen) + + } + val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e => + (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) + }.getOrElse(parseRowV3(kv, schemaVer)) + + val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId) + val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId) + + val (props, op, ts, statusCode, _pendingEdgeOpt) = { + var pos = 0 + val (statusCode, op) = statusCodeWithOp(kv.value(pos)) + pos += 1 + val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val kvsMap = props.toMap + val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong + + pos = endAt + val _pendingEdgeOpt = + if (pos == kv.value.length) None + else { + val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) + pos += 1 + // val versionNum = Bytes.toLong(kv.value, pos, 8) + // pos += 8 + val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + pos = endAt + val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) + + val pendingEdge = + Edge(Vertex(srcVertexId, cellVersion), + Vertex(tgtVertexId, cellVersion), + labelWithDir, pendingEdgeOp, + cellVersion, pendingEdgeProps.toMap, + statusCode = pendingEdgeStatusCode, lockTs = lockTs) + Option(pendingEdge) + } + + (kvsMap, op, ts, statusCode, _pendingEdgeOpt) + } + + SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), + labelWithDir, op, cellVersion, props, statusCode = statusCode, + pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala new file mode 100644 index 0000000..9e6e1b7 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala @@ -0,0 +1,76 @@ +package com.kakao.s2graph.core.storage + +import com.kakao.s2graph.core.SnapshotEdge +import com.kakao.s2graph.core.mysqls.LabelIndex +import com.kakao.s2graph.core.types.{HBaseType, SourceAndTargetVertexIdPair, VertexId} +import org.apache.hadoop.hbase.util.Bytes + + +class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { + import StorageSerializable._ + + val label = snapshotEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { + val byte = (((statusCode << 4) | op).toByte) + Array.fill(1)(byte.toByte) + } + def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), + propsToKeyValuesWithTs(snapshotEdge.props.toList)) + + override def toKeyValues: Seq[SKeyValue] = { + label.schemaVersion match { + case HBaseType.VERSION1 | HBaseType.VERSION2 => toKeyValuesInner + case _ => toKeyValuesInnerV3 + } + } + + private def toKeyValuesInner: Seq[SKeyValue] = { + val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes + val labelWithDirBytes = snapshotEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes + + val qualifier = tgtIdBytes + + val value = snapshotEdge.pendingEdgeOpt match { + case None => valueBytes() + case Some(pendingEdge) => + val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) + val versionBytes = Array.empty[Byte] + val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) + val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) + Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) + } + val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) + Seq(kv) + } + + private def toKeyValuesInnerV3: Seq[SKeyValue] = { + val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes + val labelWithDirBytes = snapshotEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) + + val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val qualifier = Array.empty[Byte] + + val value = snapshotEdge.pendingEdgeOpt match { + case None => valueBytes() + case Some(pendingEdge) => + val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) + val versionBytes = Array.empty[Byte] + val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) + val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) + + Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) + } + + val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) + Seq(kv) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala index c9e768e..bca8df3 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala @@ -1,64 +1,324 @@ package com.kakao.s2graph.core.storage -import com.google.common.cache.Cache +import com.kakao.s2graph.core.ExceptionHandler.{Key, Val, KafkaMessage} +import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.{Service, Label} -import com.kakao.s2graph.core.utils.logger +import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.types._ +import com.kakao.s2graph.core.utils.{Extensions, logger} import com.typesafe.config.Config - - +import org.apache.hadoop.hbase.util.Bytes +import org.apache.kafka.clients.producer.ProducerRecord +import scala.annotation.tailrec import scala.collection.Seq +import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} -import scala.util.Try +import scala.util.{Random, Try} + +abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { + /** storage dependent configurations */ + val MaxRetryNum = config.getInt("max.retry.number") + val MaxBackOff = config.getInt("max.back.off") + val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") + val FailProb = config.getDouble("hbase.fail.prob") + val LockExpireDuration = Math.max(MaxRetryNum * MaxBackOff * 2, 10000) + val maxSize = config.getInt("future.cache.max.size") + val expireAfterWrite = config.getInt("future.cache.expire.after.write") + val expireAfterAccess = config.getInt("future.cache.expire.after.access") + + + /** + * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue] + * so we can store this kvs. + * @param snapshotEdge: snapshotEdge to serialize + * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue] + */ + def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = new SnapshotEdgeSerializable(snapshotEdge) + + /** + * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue] + * @param indexedEdge: indexEdge to serialize + * @return serializer implementation + */ + def indexEdgeSerializer(indexedEdge: IndexEdge) = new IndexEdgeSerializable(indexedEdge) + + /** + * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue] + * @param vertex: vertex to serialize + * @return serializer implementation + */ + def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex) + + /** + * create deserializer that can parse stored CanSKeyValue into snapshotEdge. + * note that each storage implementation should implement implicit type class + * to convert storage dependent dataType into common SKeyValue type by implementing CanSKeyValue + * + * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has implicit type conversion method. + * if any storaage use different class to represent stored byte array, + * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue. + * */ + val snapshotEdgeDeserializer = new SnapshotEdgeDeserializable -abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { + /** create deserializer that can parse stored CanSKeyValue into indexEdge. */ + val indexEdgeDeserializer = new IndexEdgeDeserializable - def cacheOpt: Option[Cache[Integer, Seq[QueryResult]]] + /** create deserializer that can parser stored CanSKeyValue into vertex. */ + val vertexDeserializer = new VertexDeserializable - def vertexCacheOpt: Option[Cache[Integer, Option[Vertex]]] - // Serializer/Deserializer - def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): StorageSerializable[SnapshotEdge] + /** + * decide how to store given key values Seq[SKeyValue] into storage using storage's client. + * note that this should be return true on all success. + * we assumes that each storage implementation has client as member variable. + * + * + * @param cluster: where this key values should be stored. + * @param kvs: sequence of SKeyValue that need to be stored in storage. + * @param withWait: flag to control wait ack from storage. + * note that in AsynchbaseStorage(which support asynchronous operations), even with true, + * it never block thread, but rather submit work and notified by event loop when storage send ack back. + * @return ack message from storage. + */ + def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] - def indexEdgeSerializer(indexedEdge: IndexEdge): StorageSerializable[IndexEdge] +// def writeToStorage(kv: SKeyValue, withWait: Boolean): Future[Boolean] - def vertexSerializer(vertex: Vertex): StorageSerializable[Vertex] + /** + * fetch SnapshotEdge for given request from storage. + * also storage datatype should be converted into SKeyValue. + * note that return type is Sequence rather than single SKeyValue for simplicity, + * even though there is assertions sequence.length == 1. + * @param request + * @return + */ + def fetchSnapshotEdgeKeyValues(request: AnyRef): Future[Seq[SKeyValue]] - def snapshotEdgeDeserializer: StorageDeserializable[SnapshotEdge] + /** + * write requestKeyValue into storage if the current value in storage that is stored matches. + * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. + * + * Most important thing is this have to be 'atomic' operation. + * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be + * either blocked or failed on write-write conflict case. + * + * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to + * prevent wrong data for read. + * + * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, + * compareAndSet to synchronize. + * + * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. + * for storage that does not support concurrency control, then storage implementation + * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) + * and write(writeLock). + * @param requestKeyValue + * @param expectedOpt + * @return + */ + def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] - def indexEdgeDeserializer: StorageDeserializable[IndexEdge] + /** + * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues. + * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build + * client request(GetRequest, Scanner) based on user provided query. + * @param queryRequest + * @return + */ + def buildRequest(queryRequest: QueryRequest): AnyRef - def vertexDeserializer: StorageDeserializable[Vertex] + /** + * fetch IndexEdges for given queryParam in queryRequest. + * this expect previous step starting score to propagate score into next step. + * also parentEdges is necessary to return full bfs tree when query require it. + * + * note that return type is general type. + * for example, currently we wanted to use Asynchbase + * so single I/O return type should be Deferred[T]. + * + * if we use native hbase client, then this return type can be Future[T] or just T. + * @param queryRequest + * @param prevStepScore + * @param isInnerCall + * @param parentEdges + * @return + */ + def fetch(queryRequest: QueryRequest, + prevStepScore: Double, + isInnerCall: Boolean, + parentEdges: Seq[EdgeWithScore]): R - // Interface - def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] + /** + * responsible to fire parallel fetch call into storage and create future that will return merged result. + * @param queryRequestWithScoreLs + * @param prevStepEdges + * @return + */ + def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]] - def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] + /** + * fetch Vertex for given request from storage. + * @param request + * @return + */ + def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] - def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] + /** + * decide how to apply given edges(indexProps values + Map(_count -> countVal)) into storage. + * @param edges + * @param withWait + * @return + */ + def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] + + /** + * this method need to be called when client shutdown. this is responsible to cleanUp the resources + * such as client into storage. + */ + def flush(): Unit + + /** + * create table on storage. + * if storage implementation does not support namespace or table, then there is nothing to be done + * @param zkAddr + * @param tableName + * @param cfs + * @param regionMultiplier + * @param ttl + * @param compressionAlgorithm + */ + def createTable(zkAddr: String, + tableName: String, + cfs: List[String], + regionMultiplier: Int, + ttl: Option[Int], + compressionAlgorithm: String): Unit + + + + + + /** Public Interface */ + + def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { + def fromResult(queryParam: QueryParam, + kvs: Seq[SKeyValue], + version: String): Option[Vertex] = { + if (kvs.isEmpty) None + else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None) + } + + val futures = vertices.map { vertex => + val queryParam = QueryParam.Empty + val q = Query.toQuery(Seq(vertex), queryParam) + val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) + fetchVertexKeyValues(buildRequest(queryRequest)).map { kvs => + fromResult(queryParam, kvs, vertex.serviceColumn.schemaVersion) + } recoverWith { case ex: Throwable => + Future.successful(None) + } + } + + Future.sequence(futures).map { result => result.toList.flatten } + } def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] = { - val futures = elements.map { - case edge: Edge => mutateEdge(edge, withWait) - case vertex: Vertex => mutateVertex(vertex, withWait) - case element => throw new RuntimeException(s"$element is not edge/vertex") + + val edgeBuffer = ArrayBuffer[Edge]() + val vertexBuffer = ArrayBuffer[Vertex]() + + elements.foreach { + case e: Edge => edgeBuffer += e + case v: Vertex => vertexBuffer += v + case any@_ => logger.error(s"Unknown type: ${any}") + } + + val edgeFuture = mutateEdges(edgeBuffer, withWait) + val vertexFuture = mutateVertices(vertexBuffer, withWait) + + val graphFuture = for { + edgesMutated <- edgeFuture + verticesMutated <- vertexFuture + } yield edgesMutated ++ verticesMutated + + graphFuture + } + + def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { + val (strongEdges, weakEdges) = + (edges.partition(e => e.label.consistencyLevel == "strong" || e.op == GraphUtil.operations("insertBulk"))) + + val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) => + val mutations = edges.flatMap { edge => + val (_, edgeUpdate) = + if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge) + else Edge.buildOperation(None, Seq(edge)) + buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++ + snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate) + } + writeToStorage(zkQuorum, mutations, withWait) + } + val strongEdgesFutures = mutateStrongEdges(strongEdges, withWait) + for { + weak <- Future.sequence(weakEdgesFutures) + strong <- strongEdgesFutures + } yield { + strong ++ weak } - Future.sequence(futures) } + def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { + + val grouped = _edges.groupBy { edge => (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq + val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => + val (deleteAllEdges, edges) = edgeGroup.partition(_.op == GraphUtil.operations("deleteAll")) - def mutateEdge(edge: Edge, withWait: Boolean): Future[Boolean] + // DeleteAll first + val deleteAllFutures = deleteAllEdges.map { edge => + deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts) + } - def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] + // After deleteAll, process others + lazy val mutateEdgeFutures = edges.toList match { + case head :: tail => + // val strongConsistency = edges.head.label.consistencyLevel == "strong" + // if (strongConsistency) { + val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , withWait)(Edge.buildOperation) -// def mutateEdges(edges: Seq[Edge], -// withWait: Boolean = false): Future[Seq[Boolean]] = { -// val futures = edges.map { edge => mutateEdge(edge, withWait) } -// Future.sequence(futures) -// } + //TODO: decide what we will do on failure on vertex put + val puts = buildVertexPutsAsync(head) + val vertexFuture = writeToStorage(head.label.hbaseZkAddr, puts, withWait) + Seq(edgeFuture, vertexFuture) + // } else { + // edges.map { edge => mutateEdge(edge, withWait = withWait) } + // } + case Nil => Nil + } - def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] + val composed = for { + deleteRet <- Future.sequence(deleteAllFutures) + mutateRet <- Future.sequence(mutateEdgeFutures) + } yield deleteRet ++ mutateRet + + composed.map(_.forall(identity)) + } + + Future.sequence(mutateEdges) + } + + def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = { + if (vertex.op == GraphUtil.operations("delete")) { + writeToStorage(vertex.hbaseZkAddr, + vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) + } else if (vertex.op == GraphUtil.operations("deleteAll")) { + logger.info(s"deleteAll for vertex is truncated. $vertex") + Future.successful(true) // Ignore withWait parameter, because deleteAll operation may takes long time + } else { + writeToStorage(vertex.hbaseZkAddr, buildPutsAll(vertex), withWait) + } + } def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = { @@ -66,30 +326,272 @@ abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { Future.sequence(futures) } - def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] - def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] + def mutateEdgesInner(edges: Seq[Edge], + checkConsistency: Boolean, + withWait: Boolean)(f: (Option[Edge], Seq[Edge]) => (Edge, EdgeMutate)): Future[Boolean] = { + if (!checkConsistency) { + val zkQuorum = edges.head.label.hbaseZkAddr + val futures = edges.map { edge => + val (_, edgeUpdate) = f(None, Seq(edge)) + val mutations = + indexedEdgeMutations(edgeUpdate) ++ + snapshotEdgeMutations(edgeUpdate) ++ + increments(edgeUpdate) + writeToStorage(zkQuorum, mutations, withWait) + } + Future.sequence(futures).map { rets => rets.forall(identity) } + } else { + def commit(_edges: Seq[Edge], statusCode: Byte): Future[Boolean] = { - def flush(): Unit + fetchSnapshotEdge(_edges.head) flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => - def createTable(zkAddr: String, - tableName: String, - cfs: List[String], - regionMultiplier: Int, - ttl: Option[Int], - compressionAlgorithm: String): Unit + val (newEdge, edgeUpdate) = f(snapshotEdgeOpt, _edges) + logger.debug(s"${snapshotEdgeOpt}\n${edgeUpdate.toLogString}") + //shouldReplace false. + if (edgeUpdate.newSnapshotEdge.isEmpty && statusCode <= 0) { + logger.debug(s"${newEdge.toLogString} drop.") + Future.successful(true) + } else { + commitUpdate(newEdge, statusCode)(snapshotEdgeOpt, kvOpt, edgeUpdate).map { ret => + if (ret) { + logger.info(s"[Success] commit: \n${_edges.map(_.toLogString).mkString("\n")}") + } else { + throw new PartialFailureException(newEdge, 3, "commit failed.") + } + true + } + } + } + } + def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte)(fn: (Seq[Edge], Byte) => Future[Boolean]): Future[Boolean] = { + if (tryNum >= MaxRetryNum) { + edges.foreach { edge => + logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") + ExceptionHandler.enqueue(ExceptionHandler.toKafkaMessage(element = edge)) + } + Future.successful(false) + } else { + val future = fn(edges, statusCode) + future.onSuccess { + case success => + logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n") + } + future recoverWith { + case FetchTimeoutException(retryEdge) => + logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") + retry(tryNum + 1)(edges, statusCode)(fn) + + case PartialFailureException(retryEdge, failedStatusCode, faileReason) => + val status = failedStatusCode match { + case 0 => "AcquireLock failed." + case 1 => "Mutation failed." + case 2 => "Increment failed." + case 3 => "ReleaseLock failed." + case 4 => "Unknown" + } + + Thread.sleep(Random.nextInt(MaxBackOff)) + logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") + retry(tryNum + 1)(Seq(retryEdge), failedStatusCode)(fn) + case ex: Exception => + logger.error("Unknown exception", ex) + Future.successful(false) + } + } + } + retry(1)(edges, 0)(commit) + } + } + + def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge], + newEdge: Edge, edgeMutate: EdgeMutate) = + Seq("----------------------------------------------", + s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}", + s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}", + s"newEdge: ${newEdge.toLogString}", + s"mutation: \n${edgeMutate.toLogString}", + "----------------------------------------------").mkString("\n") + + + /** Delete All */ + protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest, + queryResult: QueryResult, + requestTs: Long, + retryNum: Int): Future[Boolean] = { + val queryParam = queryRequest.queryParam + val zkQuorum = queryParam.label.hbaseZkAddr + val futures = for { + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + } yield { + /** reverted direction */ + val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge => + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + buildIncrementsAsync(indexEdge, -1L) + } + val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge => + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + buildIncrementsAsync(indexEdge, -1L) + } + val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations + writeToStorage(zkQuorum, mutations, withWait = true) + } + Future.sequence(futures).map { rets => rets.forall(identity) } + } + + protected def buildEdgesToDelete(queryRequestWithResultLs: QueryRequestWithResult, requestTs: Long): QueryResult = { + val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs).get + val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore => + (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree + }.map { edgeWithScore => + val label = queryRequest.queryParam.label + val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { + case "strong" => + val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++ + Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) + (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) + case _ => + val oldEdge = edgeWithScore.edge + (oldEdge.op, oldEdge.version, oldEdge.propsWithTs) + } + + val copiedEdge = + edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) + + val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) +// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") + edgeToDelete + } + + queryResult.copy(edgeWithScoreLs = edgeWithScoreLs) + } + + protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = { + val queryResultLs = queryRequestWithResultLs.map(_.queryResult) + queryResultLs.foreach { queryResult => + if (queryResult.isFailure) throw new RuntimeException("fetched result is fallback.") + } + val futures = for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get + deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs) + if deleteQueryResult.edgeWithScoreLs.nonEmpty + } yield { + val label = queryRequest.queryParam.label + label.schemaVersion match { + case HBaseType.VERSION3 => + if (label.consistencyLevel == "strong") { + /** + * read: snapshotEdge on queryResult = O(N) + * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) + */ + mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity)) + } else { + deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum) + } + case _ => + + /** + * read: x + * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) + */ + deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum) + } + } + + if (futures.isEmpty) { + // all deleted. + Future.successful(true -> true) + } else { + Future.sequence(futures).map { rets => false -> rets.forall(identity) } + } + } + + protected def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, Boolean)] = { + val future = for { + queryRequestWithResultLs <- getEdges(query) + (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, requestTs) + } yield { +// logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") + (allDeleted, ret) + } + + Extensions.retryOnFailure(MaxRetryNum) { + future + } { + logger.error(s"fetch and deleteAll failed.") + (true, false) + } + + } + + def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], + labels: Seq[Label], + dir: Int, + ts: Long): Future[Boolean] = { + + def enqueueLogMessage() = { + val kafkaMessages = for { + vertice <- srcVertices + id = vertice.innerId.toIdString() + label <- labels + } yield { + val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t") + val topic = ExceptionHandler.failTopic + val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) + kafkaMsg + } + + ExceptionHandler.enqueues(kafkaMessages) + } + + val requestTs = ts + val queryParams = for { + label <- labels + } yield { + val labelWithDir = LabelWithDirection(label.id.get, dir) + QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw)) + } + + val step = Step(queryParams.toList) + val q = Query(srcVertices, Vector(step)) + + // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { + val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) { + fetchAndDeleteAll(q, requestTs) + } { case (allDeleted, deleteSuccess) => + allDeleted && deleteSuccess + }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } + + retryFuture onFailure { + case ex => + logger.error(s"[Error]: deleteAllAdjacentEdges failed.") + enqueueLogMessage() + } + + retryFuture + } + + /** End Of Delete All */ + + + + + /** Parsing Logic: parse from kv from Storage into Edge */ def toEdge[K: CanSKeyValue](kv: K, queryParam: QueryParam, cacheElementOpt: Option[IndexEdge], parentEdges: Seq[EdgeWithScore]): Option[Edge] = { +// logger.debug(s"toEdge: $kv") try { - val indexEdge = indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) - - Option(indexEdge.toEdge.copy(parentEdges = parentEdges)) + val indexEdgeOpt = indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) + indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges)) } catch { case ex: Exception => - logger.error(s"Fail on toEdge: ${kv.toString}, ${queryParam}") + logger.error(s"Fail on toEdge: ${kv.toString}, ${queryParam}", ex) None } } @@ -99,19 +601,24 @@ abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { cacheElementOpt: Option[SnapshotEdge] = None, isInnerCall: Boolean, parentEdges: Seq[EdgeWithScore]): Option[Edge] = { - val snapshotEdge = snapshotEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) +// logger.debug(s"SnapshottoEdge: $kv") + val snapshotEdgeOpt = snapshotEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) if (isInnerCall) { - val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) - if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) - else None - } else { - if (Edge.allPropsDeleted(snapshotEdge.props)) None - else { + snapshotEdgeOpt.flatMap { snapshotEdge => val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) else None } + } else { + snapshotEdgeOpt.flatMap { snapshotEdge => + if (Edge.allPropsDeleted(snapshotEdge.props)) None + else { + val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges) + if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge) + else None + } + } } } @@ -126,7 +633,7 @@ abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { val kv = first val cacheElementOpt = if (queryParam.isSnapshotEdge) None - else Option(indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None)) + else indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None) for { kv <- kvs @@ -145,6 +652,526 @@ abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { } } + /** End Of Parse Logic */ + +// /** methods for consistency */ +// protected def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { +// if (elementRpcs.isEmpty) { +// Future.successful(true) +// } else { +// val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) } +// Future.sequence(futures).map(_.forall(identity)) +// } +// } + + case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: String) extends Exception + + protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { + val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") + logger.debug(msg) + } + + protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate) = { + val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}", + s"${edgeMutate.toLogString}").mkString("\n") + logger.debug(msg) + } + + protected def buildLockEdge(snapshotEdgeOpt: Option[Edge], edge: Edge, kvOpt: Option[SKeyValue]) = { + val currentTs = System.currentTimeMillis() + val lockTs = snapshotEdgeOpt match { + case None => Option(currentTs) + case Some(snapshotEdge) => + snapshotEdge.pendingEdgeOpt match { + case None => Option(currentTs) + case Some(pendingEdge) => pendingEdge.lockTs + } + } + val newVersion = kvOpt.map(_.timestamp).getOrElse(edge.ts) + 1 + // snapshotEdgeOpt.map(_.version).getOrElse(edge.ts) + 1 + val pendingEdge = edge.copy(version = newVersion, statusCode = 1, lockTs = lockTs) + val base = snapshotEdgeOpt match { + case None => + // no one ever mutated on this snapshotEdge. + edge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) + case Some(snapshotEdge) => + // there is at least one mutation have been succeed. + snapshotEdgeOpt.get.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) + } + base.copy(version = newVersion, statusCode = 1, lockTs = None) + } + + protected def buildReleaseLockEdge(snapshotEdgeOpt: Option[Edge], lockEdge: SnapshotEdge, + edgeMutate: EdgeMutate) = { + val newVersion = lockEdge.version + 1 + val base = edgeMutate.newSnapshotEdge match { + case None => + // shouldReplace false + assert(snapshotEdgeOpt.isDefined) + snapshotEdgeOpt.get.toSnapshotEdge + case Some(newSnapshotEdge) => newSnapshotEdge + } + base.copy(version = newVersion, statusCode = 0, pendingEdgeOpt = None) + } + + protected def acquireLock(statusCode: Byte, + edge: Edge, + oldSnapshotEdgeOpt: Option[Edge], + lockEdge: SnapshotEdge, + oldBytes: Array[Byte]): Future[Boolean] = { + if (statusCode >= 1) { + logger.debug(s"skip acquireLock: [$statusCode]\n${edge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) throw new PartialFailureException(edge, 0, s"$p") + else { + val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head + val oldPut = oldSnapshotEdgeOpt.map(e => snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head) +// val lockEdgePut = buildPutAsync(lockEdge).head +// val oldPut = oldSnapshotEdgeOpt.map(e => buildPutAsync(e.toSnapshotEdge).head) + writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception => + logger.error(s"AcquireLock RPC Failed.") + throw new PartialFailureException(edge, 0, "AcquireLock RPC Failed") + }.map { ret => + if (ret) { + val log = Seq( + "\n", + "=" * 50, + s"[Success]: acquireLock", + s"[RequestEdge]: ${edge.toLogString}", + s"[LockEdge]: ${lockEdge.toLogString()}", + s"[PendingEdge]: ${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}", + "=" * 50, "\n").mkString("\n") + + logger.debug(log) + // debug(ret, "acquireLock", edge.toSnapshotEdge) + } else { + throw new PartialFailureException(edge, 0, "hbase fail.") + } + true + } + } + } + } + + + + protected def releaseLock(predicate: Boolean, + edge: Edge, + lockEdge: SnapshotEdge, + releaseLockEdge: SnapshotEdge, + _edgeMutate: EdgeMutate, + oldBytes: Array[Byte]): Future[Boolean] = { + if (!predicate) { + throw new PartialFailureException(edge, 3, "predicate failed.") + } + val p = Random.nextDouble() + if (p < FailProb) throw new PartialFailureException(edge, 3, s"$p") + else { + val releaseLockEdgePut = snapshotEdgeSerializer(releaseLockEdge).toKeyValues.head + val lockEdgePut = snapshotEdgeSerializer(lockEdge).toKeyValues.head + writeLock(releaseLockEdgePut, Option(lockEdgePut)).recoverWith { + case ex: Exception => + logger.error(s"ReleaseLock RPC Failed.") + throw new PartialFailureException(edge, 3, "ReleaseLock RPC Failed") + }.map { ret => + if (ret) { + debug(ret, "releaseLock", edge.toSnapshotEdge) + } else { + val msg = Seq("\nFATAL ERROR\n", + "=" * 50, + oldBytes.toList, + lockEdgePut, + releaseLockEdgePut, + // lockEdgePut.value.toList, + // releaseLockEdgePut.value().toList, + "=" * 50, + "\n" + ) + logger.error(msg.mkString("\n")) + // error(ret, "releaseLock", edge.toSnapshotEdge) + throw new PartialFailureException(edge, 3, "hbase fail.") + } + true + } + } + Future.successful(true) + } -} + protected def mutate(predicate: Boolean, + edge: Edge, + statusCode: Byte, + _edgeMutate: EdgeMutate): Future[Boolean] = { + if (!predicate) throw new PartialFailureException(edge, 1, "predicate failed.") + + if (statusCode >= 2) { + logger.debug(s"skip mutate: [$statusCode]\n${edge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) throw new PartialFailureException(edge, 1, s"$p") + else + writeToStorage(edge.label.hbaseZkAddr, indexedEdgeMutations(_edgeMutate), withWait = true).map { ret => + if (ret) { + debug(ret, "mutate", edge.toSnapshotEdge, _edgeMutate) + } else { + throw new PartialFailureException(edge, 1, "hbase fail.") + } + true + } + } + } + + protected def increment(predicate: Boolean, + edge: Edge, + statusCode: Byte, _edgeMutate: EdgeMutate): Future[Boolean] = { + if (!predicate) throw new PartialFailureException(edge, 2, "predicate failed.") + if (statusCode >= 3) { + logger.debug(s"skip increment: [$statusCode]\n${edge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) throw new PartialFailureException(edge, 2, s"$p") + else + writeToStorage(edge.label.hbaseZkAddr, increments(_edgeMutate), withWait = true).map { ret => + if (ret) { + debug(ret, "increment", edge.toSnapshotEdge, _edgeMutate) + } else { + throw new PartialFailureException(edge, 2, "hbase fail.") + } + true + } + } + } + + + /** this may be overrided by specific storage implementation */ + protected def commitProcess(edge: Edge, statusCode: Byte) + (snapshotEdgeOpt: Option[Edge], kvOpt: Option[SKeyValue]) + (lockEdge: SnapshotEdge, releaseLockEdge: SnapshotEdge, _edgeMutate: EdgeMutate): Future[Boolean] = { + val oldBytes = kvOpt.map(kv => kv.value).getOrElse(Array.empty[Byte]) + for { + locked <- acquireLock(statusCode, edge, snapshotEdgeOpt, lockEdge, oldBytes) + mutated <- mutate(locked, edge, statusCode, _edgeMutate) + incremented <- increment(mutated, edge, statusCode, _edgeMutate) + released <- releaseLock(incremented, edge, lockEdge, releaseLockEdge, _edgeMutate, oldBytes) + } yield { + released + } + } + + protected def commitUpdate(edge: Edge, + statusCode: Byte)(snapshotEdgeOpt: Option[Edge], + kvOpt: Option[SKeyValue], + edgeUpdate: EdgeMutate): Future[Boolean] = { + val label = edge.label + def oldBytes = kvOpt.map(_.value).getOrElse(Array.empty) + + val lockEdge = buildLockEdge(snapshotEdgeOpt, edge, kvOpt) + val releaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, edgeUpdate) + val _process = commitProcess(edge, statusCode)(snapshotEdgeOpt, kvOpt)_ + snapshotEdgeOpt match { + case None => + // no one ever did success on acquire lock. + _process(lockEdge, releaseLockEdge, edgeUpdate) + // process(lockEdge, releaseLockEdge, edgeUpdate, statusCode) + case Some(snapshotEdge) => + // someone did success on acquire lock at least one. + snapshotEdge.pendingEdgeOpt match { + case None => + // not locked + _process(lockEdge, releaseLockEdge, edgeUpdate) + // process(lockEdge, releaseLockEdge, edgeUpdate, statusCode) + case Some(pendingEdge) => + def isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() + if (isLockExpired) { + val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) + val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(pendingEdge)) + val newLockEdge = buildLockEdge(snapshotEdgeOpt, pendingEdge, kvOpt) + val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, newLockEdge, newEdgeUpdate) + commitProcess(edge, statusCode = 0)(snapshotEdgeOpt, kvOpt)(newLockEdge, newReleaseLockEdge, newEdgeUpdate).flatMap { ret => + // process(newLockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode = 0).flatMap { ret => + val log = s"[Success]: Resolving expired pending edge.\n${pendingEdge.toLogString}" + throw new PartialFailureException(edge, 0, log) + } + } else { + // locked + if (pendingEdge.ts == edge.ts && statusCode > 0) { + // self locked + val oldSnapshotEdge = if (snapshotEdge.ts == pendingEdge.ts) None else Option(snapshotEdge) + val (_, newEdgeUpdate) = Edge.buildOperation(oldSnapshotEdge, Seq(edge)) + val newReleaseLockEdge = buildReleaseLockEdge(snapshotEdgeOpt, lockEdge, newEdgeUpdate) + + /** lockEdge will be ignored */ + _process(lockEdge, newReleaseLockEdge, newEdgeUpdate) + // process(lockEdge, newReleaseLockEdge, newEdgeUpdate, statusCode) + } else { + throw new PartialFailureException(edge, statusCode, s"others[${pendingEdge.ts}] is mutating. me[${edge.ts}]") + } + } + } + } + } + + /** end of methods for consistency */ + + + // def futureCache[T] = Cache[Long, (Long, T)] + + protected def toRequestEdge(queryRequest: QueryRequest): Edge = { + val srcVertex = queryRequest.vertex + // val tgtVertexOpt = queryRequest.tgtVertexOpt + val edgeCf = Serializable.edgeCf + val queryParam = queryRequest.queryParam + val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt + val label = queryParam.label + val labelWithDir = queryParam.labelWithDir + val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) + val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match { + case Some(tgtVertexId) => // _to is given. + /** we use toSnapshotEdge so dont need to swap src, tgt */ + val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) + val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion) + (src, tgt) + case None => + val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) + (src, src) + } + + val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), TargetVertexId(tgtColumn.id.get, tgtInnerId)) + val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) + val currentTs = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)).toMap + Edge(srcV, tgtV, labelWithDir, propsWithTs = propsWithTs) + } + + + + protected def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = { + val labelWithDir = edge.labelWithDir + val queryParam = QueryParam(labelWithDir) + val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId)) + val q = Query.toQuery(Seq(edge.srcVertex), _queryParam) + val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam) + + fetchSnapshotEdgeKeyValues(buildRequest(queryRequest)).map { kvs => + val (edgeOpt, kvOpt) = + if (kvs.isEmpty) (None, None) + else { + val _edgeOpt = toEdges(kvs, queryParam, 1.0, isInnerCall = true, parentEdges = Nil).headOption.map(_.edge) + val _kvOpt = kvs.headOption + (_edgeOpt, _kvOpt) + } + (queryParam, edgeOpt, kvOpt) + } recoverWith { case ex: Throwable => + logger.error(s"fetchQueryParam failed. fallback return.", ex) + throw new FetchTimeoutException(s"${edge.toLogString}") + } + } + + protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = { + if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil) + else { + val queryRequest = queryRequestWithResultsLs.head.queryRequest + val q = orgQuery + val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult) + + val stepIdx = queryRequest.stepIdx + 1 + + val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None + val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) + val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) + val step = q.steps(stepIdx) + val alreadyVisited = + if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean] + else Graph.alreadyVisitedVertices(queryResultsLs) + + val groupedBy = queryResultsLs.flatMap { queryResult => + queryResult.edgeWithScoreLs.map { case edgeWithScore => + edgeWithScore.edge.tgtVertex -> edgeWithScore + } + }.groupBy { case (vertex, edgeWithScore) => vertex } + + val groupedByFiltered = for { + (vertex, edgesWithScore) <- groupedBy + aggregatedScore = edgesWithScore.map(_._2.score).sum if aggregatedScore >= prevStepThreshold + } yield vertex -> aggregatedScore + + val prevStepTgtVertexIdEdges = for { + (vertex, edgesWithScore) <- groupedBy + } yield vertex.id -> edgesWithScore.map { case (vertex, edgeWithScore) => edgeWithScore } + + val nextStepSrcVertices = if (prevStepLimit >= 0) { + groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) + } else { + groupedByFiltered.toSeq + } + + val queryRequests = for { + (vertex, prevStepScore) <- nextStepSrcVertices + queryParam <- step.queryParams + } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore) + + Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), alreadyVisited)(ec) + } + } + + protected def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = { + for { + queryRequestWithResultLs <- queryRequestWithResultLsFuture + ret <- fetchStep(orgQuery, queryRequestWithResultLs) + } yield ret + } + + def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = { + val fallback = { + val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, queryParam = QueryParam.Empty) + Future.successful(q.vertices.map(v => QueryRequestWithResult(queryRequest, QueryResult()))) + } + Try { + + if (q.steps.isEmpty) { + // TODO: this should be get vertex query. + fallback + } else { + // current stepIdx = -1 + val startQueryResultLs = QueryResult.fromVertices(q) + q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) => + fetchStepFuture(q, acc) +// fetchStepFuture(q, acc).map { stepResults => +// step.queryParams.zip(stepResults).foreach { case (qParam, queryRequestWithResult) => +// val cursor = Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor) +// qParam.cursorOpt = Option(cursor) +// } +// stepResults +// } + } + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = { + val ts = System.currentTimeMillis() + val futures = for { + (srcVertex, tgtVertex, queryParam) <- params + propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, queryParam.label.schemaVersion)) + edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = propsWithTs) + } yield { + fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) => + val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId)) + val q = Query.toQuery(Seq(edge.srcVertex), _queryParam) + val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam) + val queryResult = QueryResult(edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0))) + QueryRequestWithResult(queryRequest, queryResult) + } + } + + Future.sequence(futures) + } + + + + @tailrec + final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { + if (range < sampleNumber || set.size == sampleNumber) set + else randomInt(sampleNumber, range, set + Random.nextInt(range)) + } + + protected def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { + if (edges.size <= n){ + edges + }else{ + val plainEdges = if (queryRequest.queryParam.offset == 0) { + edges.tail + } else edges + + val randoms = randomInt(n, plainEdges.size) + var samples = List.empty[EdgeWithScore] + var idx = 0 + plainEdges.foreach { e => + if (randoms.contains(idx)) samples = e :: samples + idx += 1 + } + samples.toSeq + } + + } + /** end of query */ + + /** Mutation Builder */ + + + /** EdgeMutate */ + def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = { + val deleteMutations = edgeMutate.edgesToDelete.flatMap { indexEdge => + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) + } + val insertMutations = edgeMutate.edgesToInsert.flatMap { indexEdge => + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + } + + deleteMutations ++ insertMutations + } + + def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = + edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues).getOrElse(Nil) + + def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] = + (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match { + case (true, true) => + + /** when there is no need to update. shouldUpdate == false */ + List.empty + case (true, false) => + + /** no edges to delete but there is new edges to insert so increase degree by 1 */ + edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) } + case (false, true) => + + /** no edges to insert but there is old edges to delete so decrease degree by 1 */ + edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) } + case (false, false) => + + /** update on existing edges so no change on degree */ + List.empty + } + + /** IndexEdge */ + def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { + val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) + val _indexedEdge = indexedEdge.copy(props = newProps) + indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) + } + + def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { + val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) + val _indexedEdge = indexedEdge.copy(props = newProps) + indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) + } + def buildDeleteBelongsToId(vertex: Vertex): Seq[SKeyValue] = { + val kvs = vertexSerializer(vertex).toKeyValues + val kv = kvs.head + vertex.belongLabelIds.map { id => + kv.copy(qualifier = Bytes.toBytes(Vertex.toPropKey(id)), operation = SKeyValue.Delete) + } + } + + def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] = + if (edge.op == GraphUtil.operations("delete")) + buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex) + else + vertexSerializer(edge.srcForVertex).toKeyValues ++ vertexSerializer(edge.tgtForVertex).toKeyValues + + def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = { + vertex.op match { + case d: Byte if d == GraphUtil.operations("delete") => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) + case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala index 791a080..4b3300a 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/StorageDeserializable.scala @@ -1,7 +1,9 @@ package com.kakao.s2graph.core.storage -import com.kakao.s2graph.core.{Edge, IndexEdge, QueryParam} +import com.kakao.s2graph.core.utils.logger +import com.kakao.s2graph.core.QueryParam import com.kakao.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, InnerValLikeWithTs} +import org.apache.hadoop.hbase.util.Bytes object StorageDeserializable { /** Deserializer */ @@ -75,8 +77,19 @@ object StorageDeserializable { ret } + + def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, offset) } trait StorageDeserializable[E] { - def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E + def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): Option[E] = { + try { + Option(fromKeyValuesInner(queryParam, kvs, version, cacheElementOpt)) + } catch { + case e: Exception => + logger.error(s"${this.getClass.getName} fromKeyValues failed.", e) + None + } + } + def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala new file mode 100644 index 0000000..699981d --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala @@ -0,0 +1,46 @@ +package com.kakao.s2graph.core.storage + +import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId} +import com.kakao.s2graph.core.{QueryParam, Vertex} +import org.apache.hadoop.hbase.util.Bytes + +import scala.collection.mutable.ListBuffer + +class VertexDeserializable extends Deserializable[Vertex] { + def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[Vertex]): Vertex = { + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) + + var maxTs = Long.MinValue + val propsMap = new collection.mutable.HashMap[Int, InnerValLike] + val belongLabelIds = new ListBuffer[Int] + + for { + kv <- kvs + } { + val propKey = + if (kv.qualifier.length == 1) kv.qualifier.head.toInt + else Bytes.toInt(kv.qualifier) + + val ts = kv.timestamp + if (ts > maxTs) maxTs = ts + + if (Vertex.isLabelId(propKey)) { + belongLabelIds += Vertex.toLabelId(propKey) + } else { + val v = kv.value + val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) + propsMap += (propKey -> value) + } + } + assert(maxTs != Long.MinValue) + Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala new file mode 100644 index 0000000..bda909d --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala @@ -0,0 +1,18 @@ +package com.kakao.s2graph.core.storage + +import com.kakao.s2graph.core.Vertex +import org.apache.hadoop.hbase.util.Bytes + +case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { + + val cf = Serializable.vertexCf + + override def toKeyValues: Seq[SKeyValue] = { + val row = vertex.id.bytes + val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes + val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } + (base ++ belongsTo).map { case (qualifier, value) => + SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) + } toSeq + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e207f676/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseMutationBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseMutationBuilder.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseMutationBuilder.scala deleted file mode 100644 index 504f369..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseMutationBuilder.scala +++ /dev/null @@ -1,119 +0,0 @@ -package com.kakao.s2graph.core.storage.hbase - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.storage.{SKeyValue, MutationBuilder} -import org.apache.hadoop.hbase.util.Bytes -import org.hbase.async.{DeleteRequest, AtomicIncrementRequest, PutRequest, HBaseRpc} - -import scala.collection.Seq -import scala.concurrent.ExecutionContext - -class AsynchbaseMutationBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionContext) - extends MutationBuilder[HBaseRpc](storage) { - - def put(kvs: Seq[SKeyValue]): Seq[HBaseRpc] = - kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } - - def increment(kvs: Seq[SKeyValue]): Seq[HBaseRpc] = - kvs.map { kv => new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) } - - def delete(kvs: Seq[SKeyValue]): Seq[HBaseRpc] = - kvs.map { kv => - if (kv.qualifier == null) new DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp) - else new DeleteRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.timestamp) - } - - /** EdgeMutate */ - def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[HBaseRpc] = { - val deleteMutations = edgeMutate.edgesToDelete.flatMap(edge => buildDeletesAsync(edge)) - val insertMutations = edgeMutate.edgesToInsert.flatMap(edge => buildPutsAsync(edge)) - - deleteMutations ++ insertMutations - } - - def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[HBaseRpc] = - edgeMutate.newSnapshotEdge.map(e => buildPutAsync(e)).getOrElse(Nil) - - - def increments(edgeMutate: EdgeMutate): Seq[HBaseRpc] = { - (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match { - case (true, true) => - - /** when there is no need to update. shouldUpdate == false */ - List.empty[AtomicIncrementRequest] - case (true, false) => - - /** no edges to delete but there is new edges to insert so increase degree by 1 */ - edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) } - case (false, true) => - - /** no edges to insert but there is old edges to delete so decrease degree by 1 */ - edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) } - case (false, false) => - - /** update on existing edges so no change on degree */ - List.empty[AtomicIncrementRequest] - } - } - - /** IndexEdge */ - def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[HBaseRpc] = - storage.indexEdgeSerializer(indexedEdge).toKeyValues.headOption match { - case None => Nil - case Some(kv) => - val copiedKV = kv.copy(qualifier = Array.empty[Byte], value = Bytes.toBytes(amount)) - increment(Seq(copiedKV)) - } - - - def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[HBaseRpc] = - storage.indexEdgeSerializer(indexedEdge).toKeyValues.headOption match { - case None => Nil - case Some(kv) => - val copiedKV = kv.copy(value = Bytes.toBytes(amount)) - increment(Seq(copiedKV)) - } - - def buildDeletesAsync(indexedEdge: IndexEdge): Seq[HBaseRpc] = - delete(storage.indexEdgeSerializer(indexedEdge).toKeyValues) - - def buildPutsAsync(indexedEdge: IndexEdge): Seq[HBaseRpc] = - put(storage.indexEdgeSerializer(indexedEdge).toKeyValues) - - /** SnapshotEdge */ - def buildPutAsync(snapshotEdge: SnapshotEdge): Seq[HBaseRpc] = - put(storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues) - - def buildDeleteAsync(snapshotEdge: SnapshotEdge): Seq[HBaseRpc] = - delete(storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues) - - /** Vertex */ - def buildPutsAsync(vertex: Vertex): Seq[HBaseRpc] = { - val kvs = storage.vertexSerializer(vertex).toKeyValues - put(kvs) - } - - - def buildDeleteAsync(vertex: Vertex): Seq[HBaseRpc] = { - val kvs = storage.vertexSerializer(vertex).toKeyValues - val kv = kvs.head - delete(Seq(kv.copy(qualifier = null))) - } - - def buildDeleteBelongsToId(vertex: Vertex): Seq[HBaseRpc] = { - val kvs = storage.vertexSerializer(vertex).toKeyValues - val kv = kvs.head - - import org.apache.hadoop.hbase.util.Bytes - val newKVs = vertex.belongLabelIds.map { id => - kv.copy(qualifier = Bytes.toBytes(Vertex.toPropKey(id))) - } - delete(newKVs) - } - - def buildVertexPutsAsync(edge: Edge): Seq[HBaseRpc] = - if (edge.op == GraphUtil.operations("delete")) - buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex) - else - buildPutsAsync(edge.srcForVertex) ++ buildPutsAsync(edge.tgtForVertex) -}
