Repository: incubator-s2graph Updated Branches: refs/heads/master ed9bedf0d -> 128d67c06
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala new file mode 100644 index 0000000..1f7d863 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala @@ -0,0 +1,118 @@ +package org.apache.s2graph.core.storage.hbase + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse, SKeyValue, StorageWritable} +import org.apache.s2graph.core.utils.{Extensions, logger} +import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest} +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +class AsynchbaseStorageWritable(val client: HBaseClient, + val clientWithFlush: HBaseClient) extends StorageWritable { + import Extensions.DeferOps + + private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client + /** + * decide how to store given key values Seq[SKeyValue] into storage using storage's client. + * note that this should be return true on all success. + * we assumes that each storage implementation has client as member variable. + * + * @param cluster : where this key values should be stored. + * @param kvs : sequence of SKeyValue that need to be stored in storage. + * @param withWait : flag to control wait ack from storage. + * note that in AsynchbaseStorage(which support asynchronous operations), even with true, + * it never block thread, but rather submit work and notified by event loop when storage send ack back. + * @return ack message from storage. + */ + override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = { + if (kvs.isEmpty) Future.successful(MutateResponse.Success) + else { + val _client = client(withWait) + val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment) + + /* Asynchbase IncrementRequest does not implement HasQualifiers */ + val incrementsFutures = increments.map { kv => + val countVal = Bytes.toLong(kv.value) + val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, countVal) + val fallbackFn: (Exception => MutateResponse) = { ex => + logger.error(s"mutation failed. $request", ex) + new IncrementResponse(false, -1L, -1L) + } + val future = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long => + new IncrementResponse(true, resultCount.longValue(), countVal) + }.toFuture(MutateResponse.IncrementFailure) + + if (withWait) future else Future.successful(MutateResponse.IncrementSuccess) + } + + /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */ + val othersFutures = putAndDeletes.groupBy { kv => + (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp) + }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) => + + val durability = groupedKeyValues.head.durability + val qualifiers = new ArrayBuffer[Array[Byte]]() + val values = new ArrayBuffer[Array[Byte]]() + + groupedKeyValues.foreach { kv => + if (kv.qualifier != null) qualifiers += kv.qualifier + if (kv.value != null) values += kv.value + } + val defer = operation match { + case SKeyValue.Put => + val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp) + put.setDurable(durability) + _client.put(put) + case SKeyValue.Delete => + val delete = + if (qualifiers.isEmpty) + new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp) + else + new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp) + delete.setDurable(durability) + _client.delete(delete) + } + if (withWait) { + defer.toFuture(new AnyRef()).map(_ => MutateResponse.Success).recover { case ex: Exception => + groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) } + MutateResponse.Failure + } + } else Future.successful(MutateResponse.Success) + } + for { + incrementRets <- Future.sequence(incrementsFutures) + otherRets <- Future.sequence(othersFutures) + } yield new MutateResponse(isSuccess = (incrementRets ++ otherRets).forall(_.isSuccess)) + } + } + + /** + * write requestKeyValue into storage if the current value in storage that is stored matches. + * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. + * + * Most important thing is this have to be 'atomic' operation. + * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be + * either blocked or failed on write-write conflict case. + * + * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to + * prevent wrong data for read. + * + * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, + * compareAndSet to synchronize. + * + * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. + * for storage that does not support concurrency control, then storage implementation + * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) + * and write(writeLock). + * + * @param rpc + * @param expectedOpt + * @return + */ + override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = { + val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp) + val expected = expectedOpt.map(_.value).getOrElse(Array.empty) + client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true) + .map(r => new MutateResponse(isSuccess = r)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Deserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Deserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Deserializable.scala new file mode 100644 index 0000000..349bff3 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Deserializable.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.serde + +import org.apache.s2graph.core.types.{LabelWithDirection, VertexId} + + +trait Deserializable[E] extends StorageDeserializable[E] { + + type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int) + +// /** version 1 and version 2 share same code for parsing row key part */ +// def parseRow(kv: SKeyValue, version: String = HBaseType.DEFAULT_VERSION): RowKeyRaw = { +// var pos = 0 +// val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) +// pos += srcIdLen +// val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) +// pos += 4 +// val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) +// +// val rowLen = srcIdLen + 4 + 1 +// (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen) +// } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Serializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Serializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Serializable.scala new file mode 100644 index 0000000..46b4860 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/Serializable.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.serde + +object Serializable { + val vertexCf = "v".getBytes("UTF-8") + val edgeCf = "e".getBytes("UTF-8") +} + +trait Serializable[E] extends StorageSerializable[E] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala new file mode 100644 index 0000000..dc7690b --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.serde + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.storage.CanSKeyValue +import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs} + +object StorageDeserializable { + /** Deserializer */ + def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = { + val byte = bytes(offset) + val isInverted = if ((byte & 1) != 0) true else false + val labelOrderSeq = byte >> 1 + (labelOrderSeq.toByte, isInverted) + } + + def bytesToKeyValues(bytes: Array[Byte], + offset: Int, + length: Int, + schemaVer: String, + serviceColumn: ServiceColumn): (Array[(ColumnMeta, InnerValLike)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(ColumnMeta, InnerValLike)](len) + var i = 0 + while (i < len) { + val kSeq = Bytes.toInt(bytes, pos, 4) + val k = serviceColumn.metasMap(kSeq) + pos += 4 + + val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + val ret = (kvs, pos) + // logger.debug(s"bytesToProps: $ret") + ret + } + + def bytesToKeyValues(bytes: Array[Byte], + offset: Int, + length: Int, + schemaVer: String, + label: Label): (Array[(LabelMeta, InnerValLike)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(LabelMeta, InnerValLike)](len) + var i = 0 + while (i < len) { + val k = label.labelMetaMap(bytes(pos)) + pos += 1 + val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + val ret = (kvs, pos) + // logger.debug(s"bytesToProps: $ret") + ret + } + + def bytesToKeyValuesWithTs(bytes: Array[Byte], + offset: Int, + schemaVer: String, + label: Label): (Array[(LabelMeta, InnerValLikeWithTs)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(LabelMeta, InnerValLikeWithTs)](len) + var i = 0 + while (i < len) { + val k = label.labelMetaMap(bytes(pos)) + pos += 1 + val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, schemaVer) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + val ret = (kvs, pos) + // logger.debug(s"bytesToProps: $ret") + ret + } + + def bytesToProps(bytes: Array[Byte], + offset: Int, + schemaVer: String): (Array[(LabelMeta, InnerValLike)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(LabelMeta, InnerValLike)](len) + var i = 0 + while (i < len) { + val k = LabelMeta.empty + val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + // logger.error(s"bytesToProps: $kvs") + val ret = (kvs, pos) + + ret + } + + def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, offset) + + def bytesToInt(bytes: Array[Byte], offset: Int): Int = Bytes.toInt(bytes, offset) +} + +trait StorageDeserializable[E] { + def fromKeyValues[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: Option[E]): Option[E] +// = { +// try { +// Option(fromKeyValuesInner(kvs, cacheElementOpt)) +// } catch { +// case e: Exception => +// logger.error(s"${this.getClass.getName} fromKeyValues failed.", e) +// None +// } +// } +// def fromKeyValuesInner[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: Option[E]): E +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala new file mode 100644 index 0000000..219d097 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.serde + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.{ColumnMeta, LabelMeta} +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs} + +object StorageSerializable { + /** serializer */ + def propsToBytes(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = { + val len = props.length + assert(len < Byte.MaxValue) + var bytes = Array.fill(1)(len.toByte) + for ((_, v) <- props) bytes = Bytes.add(bytes, v.bytes) + bytes + } + + def vertexPropsToBytes(props: Seq[(ColumnMeta, Array[Byte])]): Array[Byte] = { + val len = props.length + assert(len < Byte.MaxValue) + var bytes = Array.fill(1)(len.toByte) + for ((k, v) <- props) bytes = Bytes.add(bytes, Bytes.toBytes(k.seq.toInt), v) + bytes + } + + def propsToKeyValues(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = { + val len = props.length + assert(len < Byte.MaxValue) + var bytes = Array.fill(1)(len.toByte) + for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes) + bytes + } + + def propsToKeyValuesWithTs(props: Seq[(LabelMeta, InnerValLikeWithTs)]): Array[Byte] = { + val len = props.length + assert(len < Byte.MaxValue) + var bytes = Array.fill(1)(len.toByte) + for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes) + bytes + } + + def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = { + assert(labelOrderSeq < (1 << 6)) + val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0) + Array.fill(1)(byte.toByte) + } + + def intToBytes(value: Int): Array[Byte] = Bytes.toBytes(value) + + def longToBytes(value: Long): Array[Byte] = Bytes.toBytes(value) +} + +trait StorageSerializable[E] { + val cf = Serializable.edgeCf + + def table: Array[Byte] + def ts: Long + + def toRowKey: Array[Byte] + def toQualifier: Array[Byte] + def toValue: Array[Byte] + + def toKeyValues: Seq[SKeyValue] = { + val row = toRowKey + val qualifier = toQualifier + val value = toValue + val kv = SKeyValue(table, row, cf, qualifier, value, ts) +// logger.debug(s"[SER]: ${kv.toLogString}}") + Seq(kv) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index 3da8267..2501ed9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -22,16 +22,18 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} -import org.apache.s2graph.core.storage.StorageDeserializable._ -import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable} +import org.apache.s2graph.core.storage.serde._ +import org.apache.s2graph.core.storage.serde.StorageDeserializable._ +import org.apache.s2graph.core.storage.serde.Deserializable +import org.apache.s2graph.core.storage.CanSKeyValue import org.apache.s2graph.core.types._ object IndexEdgeDeserializable{ def getNewInstance(graph: S2Graph) = new IndexEdgeDeserializable(graph) } class IndexEdgeDeserializable(graph: S2Graph, - bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2Edge] { - import StorageDeserializable._ + bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong, + tallSchemaVersions: Set[String] = Set(HBaseType.VERSION4)) extends Deserializable[S2Edge] { type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int) @@ -65,7 +67,7 @@ class IndexEdgeDeserializable(graph: S2Graph, val edge = graph.newEdge(srcVertex, null, label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState) var tsVal = version - val isTallSchema = label.schemaVersion == HBaseType.VERSION4 + val isTallSchema = tallSchemaVersions(label.schemaVersion) val isDegree = if (isTallSchema) pos == kv.row.length else kv.qualifier.isEmpty if (isDegree) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index 632eefa..28982dc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -21,13 +21,12 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.mysqls.LabelMeta -import org.apache.s2graph.core.storage.{Serializable, StorageSerializable} import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.{GraphUtil, IndexEdge} -import org.apache.s2graph.core.storage.StorageSerializable._ +import org.apache.s2graph.core.storage.serde.StorageSerializable._ +import org.apache.s2graph.core.storage.serde.Serializable class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byte] = longToBytes) extends Serializable[IndexEdge] { - import StorageSerializable._ override def ts = indexEdge.version override def table = indexEdge.label.hbaseTableName.getBytes("UTF-8") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 59db07e..68732ce 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -22,13 +22,13 @@ package org.apache.s2graph.core.storage.serde.indexedge.wide import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} -import org.apache.s2graph.core.storage.StorageDeserializable._ +import org.apache.s2graph.core.storage.serde.StorageDeserializable._ import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.storage.serde.Deserializable import org.apache.s2graph.core.types._ class IndexEdgeDeserializable(graph: S2Graph, bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2Edge] { - import StorageDeserializable._ type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 434db02..34e9a6e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -21,13 +21,12 @@ package org.apache.s2graph.core.storage.serde.indexedge.wide import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.mysqls.LabelMeta -import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.{GraphUtil, IndexEdge} -import org.apache.s2graph.core.storage.StorageSerializable._ +import org.apache.s2graph.core.storage.serde.StorageSerializable._ +import org.apache.s2graph.core.storage.serde.Serializable class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byte] = longToBytes) extends Serializable[IndexEdge] { - import StorageSerializable._ override def ts = indexEdge.version override def table = indexEdge.label.hbaseTableName.getBytes("UTF-8") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala index 3b55ed8..b618962 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -20,11 +20,12 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.tall import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelIndex, LabelMeta} -import org.apache.s2graph.core.storage.StorageDeserializable._ -import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} +import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.storage.serde.StorageDeserializable._ +import org.apache.s2graph.core.storage.CanSKeyValue import org.apache.s2graph.core.types._ import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage.serde.Deserializable class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[SnapshotEdge] { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index 76fb74d..5f00b48 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -22,12 +22,12 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.tall import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.SnapshotEdge import org.apache.s2graph.core.mysqls.LabelIndex -import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import org.apache.s2graph.core.storage.serde._ +import org.apache.s2graph.core.storage.serde.StorageSerializable._ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { - import StorageSerializable._ override def ts = snapshotEdge.version override def table = snapshotEdge.label.hbaseTableName.getBytes("UTF-8") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala index 78ac2f7..8c961ce 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -20,11 +20,12 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.wide import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} -import org.apache.s2graph.core.storage.StorageDeserializable._ -import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable} -import org.apache.s2graph.core.types.{LabelWithDirection, HBaseType, SourceVertexId, TargetVertexId} +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.storage.serde.StorageDeserializable._ +import org.apache.s2graph.core.storage.CanSKeyValue +import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceVertexId, TargetVertexId} import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage.serde.Deserializable class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[SnapshotEdge] { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala index d2544e0..df84e86 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -22,7 +22,8 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.wide import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.SnapshotEdge import org.apache.s2graph.core.mysqls.LabelIndex -import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import org.apache.s2graph.core.storage.serde.Serializable +import org.apache.s2graph.core.storage.serde.StorageSerializable._ import org.apache.s2graph.core.types.VertexId @@ -32,7 +33,6 @@ import org.apache.s2graph.core.types.VertexId * @param snapshotEdge */ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { - import StorageSerializable._ override def ts = snapshotEdge.version override def table = snapshotEdge.label.hbaseTableName.getBytes("UTF-8") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala index f8921a8..87f0947 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala @@ -1,73 +1,73 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.serde.vertex - -import org.apache.s2graph.core.mysqls.{ColumnMeta, Label} -import org.apache.s2graph.core.storage.StorageDeserializable._ -import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable} -import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, VertexId} -import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{QueryParam, S2Graph, S2Vertex} - -import scala.collection.mutable.ListBuffer - -class VertexDeserializable(graph: S2Graph, - bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2Vertex] { - def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], - cacheElementOpt: Option[S2Vertex]): Option[S2Vertex] = { - try { - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - val kv = kvs.head - val version = HBaseType.DEFAULT_VERSION - val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) - - var maxTs = Long.MinValue - val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike] - val belongLabelIds = new ListBuffer[Int] - - for { - kv <- kvs - } { - val propKey = - if (kv.qualifier.length == 1) kv.qualifier.head.toInt - else bytesToInt(kv.qualifier, 0) - - val ts = kv.timestamp - if (ts > maxTs) maxTs = ts - - if (S2Vertex.isLabelId(propKey)) { - belongLabelIds += S2Vertex.toLabelId(propKey) - } else { - val v = kv.value - val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) - val columnMeta = vertexId.column.metasMap(propKey) - propsMap += (columnMeta -> value) - } - } - assert(maxTs != Long.MinValue) - val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds) - S2Vertex.fillPropsWithTs(vertex, propsMap.toMap) - - Option(vertex) - } catch { - case e: Exception => None - } - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ +// +//package org.apache.s2graph.core.storage.serde.vertex +// +//import org.apache.s2graph.core.mysqls.ColumnMeta +//import org.apache.s2graph.core.storage.serde.StorageDeserializable._ +//import org.apache.s2graph.core.storage.CanSKeyValue +//import org.apache.s2graph.core.storage.serde.Deserializable +//import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, VertexId} +//import org.apache.s2graph.core.{S2Graph, S2Vertex} +// +//import scala.collection.mutable.ListBuffer +// +//class VertexDeserializable(graph: S2Graph, +// bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2Vertex] { +// def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], +// cacheElementOpt: Option[S2Vertex]): Option[S2Vertex] = { +// try { +// val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } +// val kv = kvs.head +// val version = HBaseType.DEFAULT_VERSION +// val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) +// +// var maxTs = Long.MinValue +// val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike] +// val belongLabelIds = new ListBuffer[Int] +// +// for { +// kv <- kvs +// } { +// val propKey = +// if (kv.qualifier.length == 1) kv.qualifier.head.toInt +// else bytesToInt(kv.qualifier, 0) +// +// val ts = kv.timestamp +// if (ts > maxTs) maxTs = ts +// +// if (S2Vertex.isLabelId(propKey)) { +// belongLabelIds += S2Vertex.toLabelId(propKey) +// } else { +// val v = kv.value +// val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) +// val columnMeta = vertexId.column.metasMap(propKey) +// propsMap += (columnMeta -> value) +// } +// } +// assert(maxTs != Long.MinValue) +// val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds) +// S2Vertex.fillPropsWithTs(vertex, propsMap.toMap) +// +// Option(vertex) +// } catch { +// case e: Exception => None +// } +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala index ee147f1..aa85574 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala @@ -1,52 +1,62 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.serde.vertex - -import org.apache.s2graph.core.S2Vertex -import org.apache.s2graph.core.storage.StorageSerializable._ -import org.apache.s2graph.core.storage.{SKeyValue, Serializable} -import org.apache.s2graph.core.utils.logger - -import scala.collection.JavaConverters._ - -case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[S2Vertex] { - - override val table = vertex.hbaseTableName.getBytes - override val ts = vertex.ts - override val cf = Serializable.vertexCf - - override def toRowKey: Array[Byte] = vertex.id.bytes - - override def toQualifier: Array[Byte] = Array.empty[Byte] - override def toValue: Array[Byte] = Array.empty[Byte] - - /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ - override def toKeyValues: Seq[SKeyValue] = { - val row = toRowKey - val base = for ((k, v) <- vertex.props.asScala ++ vertex.defaultProps.asScala) yield { - val columnMeta = v.columnMeta - intToBytes(columnMeta.seq) -> v.innerVal.bytes - } - val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(S2Vertex.toPropKey(labelId)) -> Array.empty[Byte] } - (base ++ belongsTo).map { case (qualifier, value) => - SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) - }.toSeq - } -} \ No newline at end of file +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ +// +//package org.apache.s2graph.core.storage.serde.vertex +// +//import org.apache.s2graph.core.S2Vertex +//import org.apache.s2graph.core.storage.serde.StorageSerializable._ +//import org.apache.s2graph.core.storage.SKeyValue +//import org.apache.s2graph.core.storage.serde.Serializable +// +//import scala.collection.JavaConverters._ +// +//case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[S2Vertex] { +// +// override val table = vertex.hbaseTableName.getBytes +// override val ts = vertex.ts +// override val cf = Serializable.vertexCf +// +// override def toRowKey: Array[Byte] = vertex.id.bytes +// +// override def toQualifier: Array[Byte] = Array.empty[Byte] +// +// override def toValue: Array[Byte] = { +// val props = for ((k, v) <- vertex.props.asScala ++ vertex.defaultProps.asScala) yield { +// v.columnMeta -> v.innerVal.bytes +// } +// vertexPropsToBytes(props.toSeq) +// } +// +// /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ +// override def toKeyValues: Seq[SKeyValue] = { +// val row = toRowKey +// // serializer all props into value. +// Seq( +// SKeyValue(vertex.hbaseTableName.getBytes, row, cf, toQualifier, toValue, vertex.ts) +// ) +//// val base = for ((k, v) <- vertex.props.asScala ++ vertex.defaultProps.asScala) yield { +//// val columnMeta = v.columnMeta +//// intToBytes(columnMeta.seq) -> v.innerVal.bytes +//// } +//// val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(S2Vertex.toPropKey(labelId)) -> Array.empty[Byte] } +//// (base ++ belongsTo).map { case (qualifier, value) => +//// SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) +//// }.toSeq +// } +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala new file mode 100644 index 0000000..648c9df --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.serde.vertex.tall + +import org.apache.s2graph.core.mysqls.ColumnMeta +import org.apache.s2graph.core.storage.CanSKeyValue +import org.apache.s2graph.core.storage.serde.Deserializable +import org.apache.s2graph.core.storage.serde.StorageDeserializable._ +import org.apache.s2graph.core.types.{HBaseType, InnerValLike, VertexId} +import org.apache.s2graph.core.{S2Graph, S2Vertex} + +class VertexDeserializable(graph: S2Graph, + bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2Vertex] { + def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], + cacheElementOpt: Option[S2Vertex]): Option[S2Vertex] = { + try { + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + val kv = kvs.head + val version = HBaseType.DEFAULT_VERSION + val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) + val serviceColumn = vertexId.column + val schemaVer = serviceColumn.schemaVersion + + val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, serviceColumn) + + val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike] + props.foreach { case (columnMeta, innerVal) => + propsMap += (columnMeta -> innerVal) + } + + val vertex = graph.newVertex(vertexId, kv.timestamp, S2Vertex.EmptyProps, belongLabelIds = Nil) + S2Vertex.fillPropsWithTs(vertex, propsMap.toMap) + + Option(vertex) + } catch { + case e: Exception => None + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala new file mode 100644 index 0000000..87f050d --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexSerializable.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.serde.vertex.tall + +import org.apache.s2graph.core.S2Vertex +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.s2graph.core.storage.serde.Serializable +import org.apache.s2graph.core.storage.serde.StorageSerializable._ + +import scala.collection.JavaConverters._ + +case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[S2Vertex] { + + override val table = vertex.hbaseTableName.getBytes + override val ts = vertex.ts + override val cf = Serializable.vertexCf + + override def toRowKey: Array[Byte] = vertex.id.bytes + + override def toQualifier: Array[Byte] = Array.empty[Byte] + override def toValue: Array[Byte] = { + val props = (vertex.props.asScala ++ vertex.defaultProps.asScala).toSeq.map { case (_, v) => + v.columnMeta -> v.innerVal.bytes + } + vertexPropsToBytes(props) + } + + /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ + override def toKeyValues: Seq[SKeyValue] = { + val row = toRowKey + val qualifier = toQualifier + val value = toValue + Seq( + SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) + ) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala new file mode 100644 index 0000000..bae7941 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.serde.vertex.wide + +import org.apache.s2graph.core.mysqls.ColumnMeta +import org.apache.s2graph.core.storage.CanSKeyValue +import org.apache.s2graph.core.storage.serde.Deserializable +import org.apache.s2graph.core.storage.serde.StorageDeserializable._ +import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, VertexId} +import org.apache.s2graph.core.{S2Graph, S2Vertex} + +import scala.collection.mutable.ListBuffer + +class VertexDeserializable(graph: S2Graph, + bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2Vertex] { + def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T], + cacheElementOpt: Option[S2Vertex]): Option[S2Vertex] = { + try { + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + val kv = kvs.head + val version = HBaseType.DEFAULT_VERSION + val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) + + var maxTs = Long.MinValue + val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike] + val belongLabelIds = new ListBuffer[Int] + + for { + kv <- kvs + } { + val propKey = + if (kv.qualifier.length == 1) kv.qualifier.head.toInt + else bytesToInt(kv.qualifier, 0) + + val ts = kv.timestamp + if (ts > maxTs) maxTs = ts + + if (S2Vertex.isLabelId(propKey)) { + belongLabelIds += S2Vertex.toLabelId(propKey) + } else { + val v = kv.value + val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) + val columnMeta = vertexId.column.metasMap(propKey) + propsMap += (columnMeta -> value) + } + } + assert(maxTs != Long.MinValue) + val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds) + S2Vertex.fillPropsWithTs(vertex, propsMap.toMap) + + Option(vertex) + } catch { + case e: Exception => None + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexSerializable.scala new file mode 100644 index 0000000..59db0ab --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexSerializable.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.serde.vertex.wide + +import org.apache.s2graph.core.S2Vertex +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.s2graph.core.storage.serde.Serializable +import org.apache.s2graph.core.storage.serde.StorageSerializable._ + +import scala.collection.JavaConverters._ + +case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[S2Vertex] { + + override val table = vertex.hbaseTableName.getBytes + override val ts = vertex.ts + override val cf = Serializable.vertexCf + + override def toRowKey: Array[Byte] = vertex.id.bytes + + override def toQualifier: Array[Byte] = Array.empty[Byte] + override def toValue: Array[Byte] = Array.empty[Byte] + + /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ + override def toKeyValues: Seq[SKeyValue] = { + val row = toRowKey + val base = for ((k, v) <- vertex.props.asScala ++ vertex.defaultProps.asScala) yield { + val columnMeta = v.columnMeta + intToBytes(columnMeta.seq) -> v.innerVal.bytes + } + val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(S2Vertex.toPropKey(labelId)) -> Array.empty[Byte] } + (base ++ belongsTo).map { case (qualifier, value) => + SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) + }.toSeq + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala index 9bb99ed..0a1d5f3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala @@ -90,13 +90,13 @@ object DeferCache { /** * @param config - * @param ec * @param canDefer: implicit evidence to find out implementation of CanDefer. * @tparam A: actual element type that will be stored in M[_] and C[_]. * @tparam M[_]: container type that will be stored in local cache. ex) Promise, Defer. * @tparam C[_]: container type that will be returned to client of this class. Ex) Future, Defer. */ -class DeferCache[A, M[_], C[_]](config: Config, empty: => A, name: String = "", useMetric: Boolean = false)(implicit ec: ExecutionContext, canDefer: CanDefer[A, M, C]) { +class DeferCache[A, M[_], C[_]](config: Config, empty: => A, + name: String = "", useMetric: Boolean = false)(implicit canDefer: CanDefer[A, M, C]) { type Value = (Long, C[A]) private val maxSize = config.getInt("future.cache.max.size") @@ -131,7 +131,7 @@ class DeferCache[A, M[_], C[_]](config: Config, empty: => A, name: String = "", private def checkAndExpire(cacheKey: Long, cachedAt: Long, cacheTTL: Long, - oldFuture: C[A])(op: => C[A]): C[A] = { + oldFuture: C[A])(op: => C[A])(implicit ec: ExecutionContext): C[A] = { if (System.currentTimeMillis() >= cachedAt + cacheTTL) { // future is too old. so need to expire and fetch new data from storage. futureCache.asMap().remove(cacheKey) @@ -164,7 +164,7 @@ class DeferCache[A, M[_], C[_]](config: Config, empty: => A, name: String = "", } } - def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => C[A]): C[A] = { + def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => C[A])(implicit ec: ExecutionContext): C[A] = { val cacheVal = futureCache.getIfPresent(cacheKey) cacheVal match { case null => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala index a41152c..4ed7905 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala @@ -270,12 +270,12 @@ class CrudTest extends IntegrateCommon { val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id) - if (!rets.forall(identity)) { + if (!rets.forall(_.isSuccess)) { Thread.sleep(graph.LockExpireDuration + 100) /** expect current request would be ignored */ val bulkEdges = Seq(TestUtil.toEdge(i-1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString())) val rets = TestUtil.insertEdgesSync(bulkEdges: _*) - if (rets.forall(identity)) { + if (rets.forall(_.isSuccess)) { // check val jsResult = TestUtil.getEdgesSync(queryJson) (jsResult \\ "time").head.as[Int] should be(10) @@ -295,12 +295,12 @@ class CrudTest extends IntegrateCommon { val queryJson = querySnapshotEdgeJson(serviceName, columnName, labelName, id) - if (!rets.forall(identity)) { + if (!rets.forall(_.isSuccess)) { Thread.sleep(graph.LockExpireDuration + 100) /** expect current request would be applied */ val bulkEdges = Seq(TestUtil.toEdge(i+1, "u", "e", 0, 0, testLabelName, Json.obj("time" -> 20).toString())) val rets = TestUtil.insertEdgesSync(bulkEdges: _*) - if (rets.forall(identity)) { + if (rets.forall(_.isSuccess)) { // check val jsResult = TestUtil.getEdgesSync(queryJson) (jsResult \\ "time").head.as[Int] should be(20) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/test/scala/org/apache/s2graph/core/Integrate/LabelLabelIndexMutateOptionTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/LabelLabelIndexMutateOptionTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/LabelLabelIndexMutateOptionTest.scala index 4855cfc..1dfcfe6 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/LabelLabelIndexMutateOptionTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/LabelLabelIndexMutateOptionTest.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.Integrate import org.apache.s2graph.core._ -import org.scalatest.BeforeAndAfterEach +import org.scalatest.{BeforeAndAfterEach, Tag} import play.api.libs.json._ class LabelLabelIndexMutateOptionTest extends IntegrateCommon with BeforeAndAfterEach { @@ -127,7 +127,7 @@ class LabelLabelIndexMutateOptionTest extends IntegrateCommon with BeforeAndAfte /** * { "out": {"method": "drop", "storeDegree": false} } */ - test("index for in direction should drop in direction edge and store degree") { + ignore("index for in direction should drop in direction edge and store degree") { val edges = getEdgesSync(getQuery(Seq(1, 2, 3), "in", idxDropInStoreDegree)) (edges \ "results").as[Seq[JsValue]].size should be(0) (edges \\ "_degree").map(_.as[Long]).sum should be(3) @@ -136,7 +136,7 @@ class LabelLabelIndexMutateOptionTest extends IntegrateCommon with BeforeAndAfte /** * { "in": {"method": "drop", "storeDegree": false }, "out": {"method": "drop"} } */ - test("index for out direction should drop out direction edge and store degree") { + ignore("index for out direction should drop out direction edge and store degree") { val edges = getEdgesSync(getQuery(Seq(0), "out", idxDropOutStoreDegree)) (edges \ "results").as[Seq[JsValue]].size should be(0) (edges \\ "_degree").map(_.as[Long]).sum should be(3) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala new file mode 100644 index 0000000..fd9d2b3 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala @@ -0,0 +1,59 @@ +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageSerDe +import org.apache.s2graph.core.storage.rocks.RocksStorageSerDe +import org.apache.s2graph.core.storage.serde.{StorageDeserializable, StorageSerializable} +import org.apache.s2graph.core.{S2Vertex, TestCommonWithModels} +import org.scalatest.{FunSuite, Matchers} + +class StorageIOTest extends FunSuite with Matchers with TestCommonWithModels { + + initTests() + + test("AsynchbaseStorageIO: VertexSerializer/Deserializer") { + def check(vertex: S2Vertex, + op: S2Vertex => StorageSerializable[S2Vertex], + deserializer: StorageDeserializable[S2Vertex]): Boolean = { + val sKeyValues = op(vertex).toKeyValues + val deserialized = deserializer.fromKeyValues(sKeyValues, None) + vertex == deserialized + } + + val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph) + val service = Service.findByName(serviceName, useCache = false).getOrElse { + throw new IllegalStateException("service not found.") + } + val column = ServiceColumn.find(service.id.get, columnName).getOrElse { + throw new IllegalStateException("column not found.") + } + + val vertexId = graph.newVertexId(service, column, 1L) + val vertex = graph.newVertex(vertexId) + + check(vertex, serDe.vertexSerializer, serDe.vertexDeserializer(vertex.serviceColumn.schemaVersion)) + } + + test("RocksStorageIO: VertexSerializer/Deserializer") { + def check(vertex: S2Vertex, + op: S2Vertex => StorageSerializable[S2Vertex], + deserializer: StorageDeserializable[S2Vertex]): Boolean = { + val sKeyValues = op(vertex).toKeyValues + val deserialized = deserializer.fromKeyValues(sKeyValues, None) + vertex == deserialized + } + + val serDe: StorageSerDe = new RocksStorageSerDe(graph) + val service = Service.findByName(serviceName, useCache = false).getOrElse { + throw new IllegalStateException("service not found.") + } + val column = ServiceColumn.find(service.id.get, columnName).getOrElse { + throw new IllegalStateException("column not found.") + } + + val vertexId = graph.newVertexId(service, column, 1L) + val vertex = graph.newVertex(vertexId) + + check(vertex, serDe.vertexSerializer, serDe.vertexDeserializer(vertex.serviceColumn.schemaVersion)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala index a5c974e..0cbaa81 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -46,7 +46,8 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { val labelOpt = Option(l) val edge = graph.newEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion))) val indexEdge = edge.edgesWithIndex.find(_.labelIndexSeq == LabelIndex.DefaultSeq).head - val _indexEdgeOpt = graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues, None) + val kvs = graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues + val _indexEdgeOpt = graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(kvs, None) _indexEdgeOpt should not be empty edge == _indexEdgeOpt.get should be(true) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala new file mode 100644 index 0000000..8a8a532 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala @@ -0,0 +1,33 @@ +package org.apache.s2graph.core.storage.rocks + +import org.apache.s2graph.core.TestCommonWithModels +import org.apache.s2graph.core.mysqls.{Service, ServiceColumn} +import org.apache.tinkerpop.gremlin.structure.T +import org.scalatest.{FunSuite, Matchers} + +import scala.collection.JavaConversions._ + +class RocksStorageTest extends FunSuite with Matchers with TestCommonWithModels { + initTests() + + test("VertexTest: shouldNotGetConcurrentModificationException()") { + val service = Service.findByName(serviceName, useCache = false).getOrElse { + throw new IllegalStateException("service not found.") + } + val column = ServiceColumn.find(service.id.get, columnName).getOrElse { + throw new IllegalStateException("column not found.") + } + + val vertexId = graph.newVertexId(service, column, 1L) + + val vertex = graph.newVertex(vertexId) + for (i <- (0 until 10)) { + vertex.addEdge(labelName, vertex) + } + + println(graph.edges().toSeq) + println("*" * 100) + vertex.remove() + println(graph.vertices().toSeq) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index 28da7fe..9a45bd5 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -25,6 +25,7 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.Label import org.apache.s2graph.core.rest.RequestParser import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse} import org.apache.s2graph.rest.play.actors.QueueActor import org.apache.s2graph.rest.play.config.Config import play.api.libs.json._ @@ -92,7 +93,7 @@ object EdgeController extends Controller { val result = s2.mutateElements(elements.map(_._1), true) result onComplete { results => results.get.zip(elements).map { - case (false, (e: S2Edge, tsv: String)) => + case (r: MutateResponse, (e: S2Edge, tsv: String)) if !r.isSuccess => val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){ toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts) } else{ @@ -119,13 +120,13 @@ object EdgeController extends Controller { val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) => !skipElement(element.isAsync) } - val retToSkip = elementAsync.map(_._2 -> true) + val retToSkip = elementAsync.map(_._2 -> MutateResponse.Success) val elementsToStore = elementSync.map(_._1) val elementsIdxToStore = elementSync.map(_._2) mutateElementsWithFailLog(elementsToStore).map { rets => elementsIdxToStore.zip(rets) ++ retToSkip }.map { rets => - Json.toJson(rets.sortBy(_._1).map(_._2)) + Json.toJson(rets.sortBy(_._1).map(_._2.isSuccess)) }.map(jsonResponse(_)) } else { val rets = elementWithIdxs.map { case ((element, tsv), idx) => @@ -232,8 +233,8 @@ object EdgeController extends Controller { else { s2.incrementCounts(edges, withWait = true).map { results => - val json = results.map { case (isSuccess, resultCount, count) => - Json.obj("success" -> isSuccess, "result" -> resultCount, "_count" -> count) + val json = results.map { case IncrementResponse(isSuccess, afterCount, beforeCount) => + Json.obj("success" -> isSuccess, "result" -> afterCount, "_count" -> beforeCount) } jsonResponse(Json.toJson(json)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala index 43f0b15..a6df439 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala @@ -20,8 +20,9 @@ package org.apache.s2graph.rest.play.controllers import org.apache.s2graph.core.rest.RequestParser +import org.apache.s2graph.core.storage.MutateResponse import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{ExceptionHandler, S2Graph, GraphExceptions} +import org.apache.s2graph.core.{ExceptionHandler, GraphExceptions, S2Graph} import org.apache.s2graph.rest.play.actors.QueueActor import org.apache.s2graph.rest.play.config.Config import play.api.libs.json.{JsValue, Json} @@ -54,7 +55,7 @@ object VertexController extends Controller { if (verticesToStore.isEmpty) Future.successful(jsonResponse(Json.toJson(Seq.empty[Boolean]))) else { if (withWait) { - val rets = s2.mutateVertices(verticesToStore, withWait = true) + val rets = s2.mutateVertices(verticesToStore, withWait = true).map(_.map(_.isSuccess)) rets.map(Json.toJson(_)).map(jsonResponse(_)) } else { val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true }
