http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala new file mode 100644 index 0000000..eab5cab --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala @@ -0,0 +1,438 @@ +package org.apache.s2graph.core.storage + +import java.util.concurrent.{Executors, TimeUnit} + +import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, NoStackException} +import org.apache.s2graph.core._ +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.Random + +class WriteWriteConflictResolver(graph: S2Graph, + serDe: StorageSerDe, + io: StorageIO, + mutator: StorageWritable, + fetcher: StorageReadable[_]) { + + val BackoffTimeout = graph.BackoffTimeout + val MaxRetryNum = graph.MaxRetryNum + val MaxBackOff = graph.MaxBackOff + val FailProb = graph.FailProb + val LockExpireDuration = graph.LockExpireDuration + val MaxSize = graph.MaxSize + val ExpireAfterWrite = graph.ExpireAfterWrite + val ExpireAfterAccess = graph.ExpireAfterAccess + + /** retry scheduler */ + val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor() + + protected def exponentialBackOff(tryNum: Int) = { + // time slot is divided by 10 ms + val slot = 10 + Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt) + } + + def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = { + if (tryNum >= MaxRetryNum) { + edges.foreach { edge => + logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") + } + + Future.successful(false) + } else { + val future = commitUpdate(edges, statusCode, fetchedSnapshotEdgeOpt) + future.onSuccess { + case success => + logger.debug(s"Finished. [$tryNum]\n${edges.head.toLogString}\n") + } + future recoverWith { + case FetchTimeoutException(retryEdge) => + logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") + /* fetch failed. re-fetch should be done */ + fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) + } + + case PartialFailureException(retryEdge, failedStatusCode, faileReason) => + val status = failedStatusCode match { + case 0 => "AcquireLock failed." + case 1 => "Mutation failed." + case 2 => "Increment failed." + case 3 => "ReleaseLock failed." + case 4 => "Unknown" + } + logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") + + /* retry logic */ + val promise = Promise[Boolean] + val backOff = exponentialBackOff(tryNum) + scheduledThreadPool.schedule(new Runnable { + override def run(): Unit = { + val future = if (failedStatusCode == 0) { + // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge. + /* fetch failed. re-fetch should be done */ + fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) + } + } else { + // partial failure occur while self locked and mutating. + // assert(fetchedSnapshotEdgeOpt.nonEmpty) + retry(tryNum + 1)(edges, failedStatusCode, fetchedSnapshotEdgeOpt) + } + promise.completeWith(future) + } + + }, backOff, TimeUnit.MILLISECONDS) + promise.future + + case ex: Exception => + logger.error("Unknown exception", ex) + Future.successful(false) + } + } + } + + protected def commitUpdate(edges: Seq[S2Edge], + statusCode: Byte, + fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = { + // Future.failed(new PartialFailureException(edges.head, 0, "ahahah")) + assert(edges.nonEmpty) + // assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined) + + statusCode match { + case 0 => + fetchedSnapshotEdgeOpt match { + case None => + /* + * no one has never mutated this SN. + * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges) + * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1) + * lock = (squashedEdge, pendingE) + * releaseLock = (edgeMutate.newSnapshotEdge, None) + */ + val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + + assert(edgeMutate.newSnapshotEdge.isDefined) + + val lockTs = Option(System.currentTimeMillis()) + val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = squashedEdge.ts + 1) + val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) + val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, + pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) + + commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) + + case Some(snapshotEdge) => + snapshotEdge.pendingEdgeOpt match { + case None => + /* + * others finished commit on this SN. but there is no contention. + * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges) + * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ? + * lock = (snapshotEdge, pendingE) + * releaseLock = (edgeMutate.newSnapshotEdge, None) + */ + val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + if (edgeMutate.newSnapshotEdge.isEmpty) { + logger.debug(s"drop this requests: \n${edges.map(_.toLogString).mkString("\n")}") + Future.successful(true) + } else { + val lockTs = Option(System.currentTimeMillis()) + val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) + val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge)) + val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, + pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) + commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) + } + case Some(pendingEdge) => + val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() + if (isLockExpired) { + /* + * if pendingEdge.ts == snapshotEdge.ts => + * (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge)) + * else => + * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, Seq(pendingEdge)) + * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) + * lock = (snapshotEdge, pendingE) + * releaseLock = (edgeMutate.newSnapshotEdge, None) + */ + logger.debug(s"${pendingEdge.toLogString} has been expired.") + val (squashedEdge, edgeMutate) = + if (pendingEdge.ts == snapshotEdge.ts) S2Edge.buildOperation(None, pendingEdge +: edges) + else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges) + + val lockTs = Option(System.currentTimeMillis()) + val newPendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1) + val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge)) + val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0, + pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1) + + commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) + } else { + /* + * others finished commit on this SN and there is currently contention. + * this can't be proceed so retry from re-fetch. + * throw EX + */ + val (squashedEdge, _) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges) + Future.failed(new PartialFailureException(squashedEdge, 0, s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]")) + } + } + + } + case _ => + + /* + * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock + */ + + /* + * this succeed to lock this SN. keep doing on commit process. + * if SN.isEmpty => + * no one never succed to commit on this SN. + * this is first mutation try on this SN. + * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges) + * else => + * assert(SN.pengingEdgeOpt.isEmpty) no-fetch after acquire lock when self retrying. + * there has been success commit on this SN. + * (squashedEdge, edgeMutate) = Edge.buildOperation(SN, edges) + * releaseLock = (edgeMutate.newSnapshotEdge, None) + */ + val _edges = + if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges + else edges + val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges) + val newVersion = fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2 + val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match { + case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) + case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion) + } + // lockSnapshotEdge will be ignored. + commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, releaseLockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) + } + } + + /** + * orchestrate commit process. + * we separate into 4 step to avoid duplicating each step over and over. + * + * @param statusCode : current statusCode of this thread to process edges. + * @param squashedEdge : squashed(in memory) final edge from input edges on same snapshotEdge. + * @param fetchedSnapshotEdgeOpt : fetched snapshotEdge from storage before commit process begin. + * @param lockSnapshotEdge : lockEdge that hold necessary data to lock this snapshotEdge for this thread. + * @param releaseLockSnapshotEdge : releaseLockEdge that will remove lock by storing new final merged states + * all from current request edges and fetched snapshotEdge. + * @param edgeMutate : mutations for indexEdge and snapshotEdge. + * @return + */ + protected def commitProcess(statusCode: Byte, + squashedEdge: S2Edge, + fetchedSnapshotEdgeOpt: Option[S2Edge], + lockSnapshotEdge: SnapshotEdge, + releaseLockSnapshotEdge: SnapshotEdge, + edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = { + for { + locked <- acquireLock(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge) + mutated <- commitIndexEdgeMutations(locked, statusCode, squashedEdge, edgeMutate) + incremented <- commitIndexEdgeDegreeMutations(mutated, statusCode, squashedEdge, edgeMutate) + lockReleased <- releaseLock(incremented, statusCode, squashedEdge, releaseLockSnapshotEdge) + } yield lockReleased + } + + case class PartialFailureException(edge: S2Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason) + + protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { + val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") + logger.debug(msg) + } + + protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge, edgeMutate: EdgeMutate) = { + val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}", + s"${edgeMutate.toLogString}").mkString("\n") + logger.debug(msg) + } + + /** + * try to acquire lock on storage for this given snapshotEdge(lockEdge). + * + * @param statusCode : current statusCode of this thread to process edges. + * @param squashedEdge : squashed(in memory) final edge from input edges on same snapshotEdge. only for debug + * @param fetchedSnapshotEdgeOpt : fetched snapshot edge from storage. + * @param lockEdge : lockEdge to build RPC request(compareAndSet) into Storage. + * @return + */ + protected def acquireLock(statusCode: Byte, + squashedEdge: S2Edge, + fetchedSnapshotEdgeOpt: Option[S2Edge], + lockEdge: SnapshotEdge)(implicit ec: ExecutionContext): Future[Boolean] = { + if (statusCode >= 1) { + logger.debug(s"skip acquireLock: [$statusCode]\n${squashedEdge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) { + Future.failed(new PartialFailureException(squashedEdge, 0, s"$p")) + } else { + val lockEdgePut = serDe.snapshotEdgeSerializer(lockEdge).toKeyValues.head + val oldPut = fetchedSnapshotEdgeOpt.map(e => serDe.snapshotEdgeSerializer(e.toSnapshotEdge).toKeyValues.head) + mutator.writeLock(lockEdgePut, oldPut).recoverWith { case ex: Exception => + logger.error(s"AcquireLock RPC Failed.") + throw new PartialFailureException(squashedEdge, 0, "AcquireLock RPC Failed") + }.map { ret => + if (ret.isSuccess) { + val log = Seq( + "\n", + "=" * 50, + s"[Success]: acquireLock", + s"[RequestEdge]: ${squashedEdge.toLogString}", + s"[LockEdge]: ${lockEdge.toLogString()}", + s"[PendingEdge]: ${lockEdge.pendingEdgeOpt.map(_.toLogString).getOrElse("")}", + "=" * 50, "\n").mkString("\n") + + logger.debug(log) + // debug(ret, "acquireLock", edge.toSnapshotEdge) + } else { + throw new PartialFailureException(squashedEdge, 0, "hbase fail.") + } + true + } + } + } + } + + + /** + * change this snapshot's state on storage from locked into committed by + * storing new merged states on storage. merge state come from releaseLockEdge. + * note that releaseLock return Future.failed on predicate failure. + * + * @param predicate : indicate if this releaseLock phase should be proceed or not. + * @param statusCode : releaseLock do not use statusCode, only for debug. + * @param squashedEdge : squashed(in memory) final edge from input edges on same snapshotEdge. only for debug + * @param releaseLockEdge : final merged states if all process goes well. + * @return + */ + protected def releaseLock(predicate: Boolean, + statusCode: Byte, + squashedEdge: S2Edge, + releaseLockEdge: SnapshotEdge)(implicit ec: ExecutionContext): Future[Boolean] = { + if (!predicate) { + Future.failed(new PartialFailureException(squashedEdge, 3, "predicate failed.")) + } else { + val p = Random.nextDouble() + if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 3, s"$p")) + else { + val releaseLockEdgePuts = serDe.snapshotEdgeSerializer(releaseLockEdge).toKeyValues + mutator.writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, releaseLockEdgePuts, withWait = true).recoverWith { + case ex: Exception => + logger.error(s"ReleaseLock RPC Failed.") + throw new PartialFailureException(squashedEdge, 3, "ReleaseLock RPC Failed") + }.map { ret => + if (ret.isSuccess) { + debug(ret.isSuccess, "releaseLock", squashedEdge.toSnapshotEdge) + } else { + val msg = Seq("\nFATAL ERROR\n", + "=" * 50, + squashedEdge.toLogString, + releaseLockEdgePuts, + "=" * 50, + "\n" + ) + logger.error(msg.mkString("\n")) + // error(ret, "releaseLock", edge.toSnapshotEdge) + throw new PartialFailureException(squashedEdge, 3, "hbase fail.") + } + true + } + } + } + } + + /** + * + * @param predicate : indicate if this commitIndexEdgeMutations phase should be proceed or not. + * @param statusCode : current statusCode of this thread to process edges. + * @param squashedEdge : squashed(in memory) final edge from input edges on same snapshotEdge. only for debug + * @param edgeMutate : actual collection of mutations. note that edgeMutate contains snapshotEdge mutations, + * but in here, we only use indexEdge's mutations. + * @return + */ + protected def commitIndexEdgeMutations(predicate: Boolean, + statusCode: Byte, + squashedEdge: S2Edge, + edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = { + if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, "predicate failed.")) + else { + if (statusCode >= 2) { + logger.debug(s"skip mutate: [$statusCode]\n${squashedEdge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 1, s"$p")) + else + mutator.writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, io.indexedEdgeMutations(edgeMutate), withWait = true).map { ret => + if (ret.isSuccess) { + debug(ret.isSuccess, "mutate", squashedEdge.toSnapshotEdge, edgeMutate) + } else { + throw new PartialFailureException(squashedEdge, 1, "hbase fail.") + } + true + } + } + } + } + + /** + * + * @param predicate : indicate if this commitIndexEdgeMutations phase should be proceed or not. + * @param statusCode : current statusCode of this thread to process edges. + * @param squashedEdge : squashed(in memory) final edge from input edges on same snapshotEdge. only for debug + * @param edgeMutate : actual collection of mutations. note that edgeMutate contains snapshotEdge mutations, + * but in here, we only use indexEdge's degree mutations. + * @return + */ + protected def commitIndexEdgeDegreeMutations(predicate: Boolean, + statusCode: Byte, + squashedEdge: S2Edge, + edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = { + + def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { + mutator.writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, kvs, withWait = withWait).map { ret => + if (ret.isSuccess) { + debug(ret.isSuccess, "increment", squashedEdge.toSnapshotEdge, edgeMutate) + } else { + throw new PartialFailureException(squashedEdge, 2, "hbase fail.") + } + true + } + } + + if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2, "predicate failed.")) + if (statusCode >= 3) { + logger.debug(s"skip increment: [$statusCode]\n${squashedEdge.toLogString}") + Future.successful(true) + } else { + val p = Random.nextDouble() + if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 2, s"$p")) + else { + val (bufferIncr, nonBufferIncr) = io.increments(edgeMutate.deepCopy) + + if (bufferIncr.nonEmpty) _write(bufferIncr, withWait = false) + _write(nonBufferIncr, withWait = true) + } + } + } + + /** end of methods for consistency */ + + def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge], + newEdge: S2Edge, edgeMutate: EdgeMutate) = + Seq("----------------------------------------------", + s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}", + s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}", + s"newEdge: ${newEdge.toLogString}", + s"mutation: \n${edgeMutate.toLogString}", + "----------------------------------------------").mkString("\n") + +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 4fb2240..54007d5 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -20,38 +20,18 @@ package org.apache.s2graph.core.storage.hbase - import java.util -import java.util.Base64 -import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import java.util.concurrent.{ExecutorService, Executors} -import com.stumbleupon.async.{Callback, Deferred} import com.typesafe.config.Config import org.apache.commons.io.FileUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability} -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.regionserver.BloomType -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} -import org.apache.hadoop.security.UserGroupInformation import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn} import org.apache.s2graph.core.storage._ -import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange} -import org.apache.s2graph.core.types.{HBaseType, VertexId} +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.AsyncRPC import org.apache.s2graph.core.utils._ -import org.hbase.async.FilterList.Operator.MUST_PASS_ALL import org.hbase.async._ - +import org.apache.s2graph.core.storage.serde._ import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer -import scala.concurrent._ -import scala.concurrent.duration.Duration -import scala.util.Try -import scala.util.control.NonFatal -import scala.util.hashing.MurmurHash3 object AsynchbaseStorage { @@ -92,6 +72,7 @@ object AsynchbaseStorage { } case class ScanWithRange(scan: Scanner, offset: Int, limit: Int) + type AsyncRPC = Either[GetRequest, ScanWithRange] def initLocalHBase(config: Config, @@ -159,734 +140,29 @@ object AsynchbaseStorage { class AsynchbaseStorage(override val graph: S2Graph, - override val config: Config)(implicit ec: ExecutionContext) - extends Storage[AsyncRPC, Deferred[StepResult]](graph, config) { - - import Extensions.DeferOps - - val hbaseExecutor: ExecutorService = - if (config.getString("hbase.zookeeper.quorum") == "localhost") - AsynchbaseStorage.initLocalHBase(config) - else - null - - /** - * Asynchbase client setup. - * note that we need two client, one for bulk(withWait=false) and another for withWait=true - */ - private val clientFlushInterval = config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort - - /** - * since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase. - * to fix version conflict, make this as lazy val for clients that don't require hbase client. - */ - lazy val client = AsynchbaseStorage.makeClient(config) - lazy val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0") - lazy val clients = Seq(client, clientWithFlush) - - private val emptyKeyValues = new util.ArrayList[KeyValue]() - private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]() - private val emptyStepResult = new util.ArrayList[StepResult]() - - private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client - - import CanDefer._ - - /** Future Cache to squash request */ - lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true) - - /** Simple Vertex Cache */ - lazy private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue]) - - private val zkQuorum = config.getString("hbase.zookeeper.quorum") - private val zkQuorumSlave = - if (config.hasPath("hbase.slave.zookeeper.quorum")) Option(config.getString("hbase.slave.zookeeper.quorum")) - else None - - /** v4 max next row size */ - private val v4_max_num_rows = 10000 - private def getV4MaxNumRows(limit : Int): Int = { - if (limit < v4_max_num_rows) limit - else v4_max_num_rows - } - - /** - * fire rpcs into proper hbase cluster using client and - * return true on all mutation success. otherwise return false. - */ - override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { - if (kvs.isEmpty) Future.successful(true) - else { - val _client = client(withWait) - val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment) - - /* Asynchbase IncrementRequest does not implement HasQualifiers */ - val incrementsFutures = increments.map { kv => - val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) - val defer = _client.atomicIncrement(inc) - val future = defer.toFuture(Long.box(0)).map(_ => true).recover { case ex: Exception => - logger.error(s"mutation failed. $kv", ex) - false - } - if (withWait) future else Future.successful(true) - } - - /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */ - val othersFutures = putAndDeletes.groupBy { kv => - (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp) - }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) => - - val durability = groupedKeyValues.head.durability - val qualifiers = new ArrayBuffer[Array[Byte]]() - val values = new ArrayBuffer[Array[Byte]]() - - groupedKeyValues.foreach { kv => - if (kv.qualifier != null) qualifiers += kv.qualifier - if (kv.value != null) values += kv.value - } - val defer = operation match { - case SKeyValue.Put => - val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp) - put.setDurable(durability) - _client.put(put) - case SKeyValue.Delete => - val delete = - if (qualifiers.isEmpty) - new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp) - else - new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp) - delete.setDurable(durability) - _client.delete(delete) - } - if (withWait) { - defer.toFuture(new AnyRef()).map(_ => true).recover { case ex: Exception => - groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) } - false - } - } else Future.successful(true) - } - for { - incrementRets <- Future.sequence(incrementsFutures) - otherRets <- Future.sequence(othersFutures) - } yield (incrementRets ++ otherRets).forall(identity) - } - } - - private def fetchKeyValues(rpc: AsyncRPC): Future[Seq[SKeyValue]] = { - val defer = fetchKeyValuesInner(rpc) - defer.toFuture(emptyKeyValues).map { kvsArr => - kvsArr.map { kv => - implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) - } - } - } - - override def fetchSnapshotEdgeKeyValues(queryRequest: QueryRequest): Future[Seq[SKeyValue]] = { - val edge = toRequestEdge(queryRequest, Nil) - val rpc = buildRequest(queryRequest, edge) - - fetchKeyValues(rpc) - } - - /** - * since HBase natively provide CheckAndSet on storage level, implementation becomes simple. - * @param rpc: key value that is need to be stored on storage. - * @param expectedOpt: last valid value for rpc's KeyValue.value from fetching. - * @return return true if expected value matches and our rpc is successfully applied, otherwise false. - * note that when some other thread modified same cell and have different value on this KeyValue, - * then HBase atomically return false. - */ - override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] = { - val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp) - val expected = expectedOpt.map(_.value).getOrElse(Array.empty) - client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true) - } - - - /** - * given queryRequest, build storage specific RPC Request. - * In HBase case, we either build Scanner or GetRequest. - * - * IndexEdge layer: - * Tall schema(v4): use scanner. - * Wide schema(label's schema version in v1, v2, v3): use GetRequest with columnRangeFilter - * when query is given with itnerval option. - * SnapshotEdge layer: - * Tall schema(v3, v4): use GetRequest without column filter. - * Wide schema(label's schema version in v1, v2): use GetRequest with columnRangeFilter. - * Vertex layer: - * all version: use GetRequest without column filter. - * @param queryRequest - * @return Scanner or GetRequest with proper setup with StartKey, EndKey, RangeFilter. - */ - override def buildRequest(queryRequest: QueryRequest, edge: S2Edge): AsyncRPC = { - import Serializable._ - val queryParam = queryRequest.queryParam - val label = queryParam.label - - val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) { - val snapshotEdge = edge.toSnapshotEdge - snapshotEdgeSerializer(snapshotEdge) - } else { - val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq) - indexEdgeSerializer(indexEdge) - } - - val rowKey = serializer.toRowKey - val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue)) - - val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) - - label.schemaVersion match { - case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty => - val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName) - scanner.setFamily(edgeCf) - - /* - * TODO: remove this part. - */ - val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq) - val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) - - val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes - val labelWithDirBytes = indexEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - - val (startKey, stopKey) = - if (queryParam.intervalOpt.isDefined) { - // interval is set. - val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Base64.getDecoder.decode(cursor) - case None => Bytes.add(baseKey, intervalMaxBytes) - } - (_startKey , Bytes.add(baseKey, intervalMinBytes)) - } else { - /* - * note: since propsToBytes encode size of property map at first byte, we are sure about max value here - */ - val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Base64.getDecoder.decode(cursor) - case None => baseKey - } - (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) - } - - scanner.setStartKey(startKey) - scanner.setStopKey(stopKey) - - if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") - - scanner.setMaxVersions(1) - // TODO: exclusive condition innerOffset with cursorOpt - if (queryParam.cursorOpt.isDefined) { - scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit)) - } else { - scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit)) - } - scanner.setMaxTimestamp(maxTs) - scanner.setMinTimestamp(minTs) - scanner.setRpcTimeout(queryParam.rpcTimeout) - - // SET option for this rpc properly. - if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit)) - else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit)) - - case _ => - val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) { - new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier) - } else { - new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf) - } - - get.maxVersions(1) - get.setFailfast(true) - get.setMinTimestamp(minTs) - get.setMaxTimestamp(maxTs) - get.setTimeout(queryParam.rpcTimeout) - - val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset) - val columnRangeFilterOpt = queryParam.intervalOpt.map { interval => - new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true) - } - get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL)) - Left(get) - } - } + override val config: Config) extends Storage[AsyncRPC](graph, config) { /** - * we are using future cache to squash requests into same key on storage. - * - * @param queryRequest - * @param isInnerCall - * @param parentEdges - * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future. - * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback - */ - override def fetch(queryRequest: QueryRequest, - isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Deferred[StepResult] = { - - def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = { - val prevStepScore = queryRequest.prevStepScore - val fallbackFn: (Exception => StepResult) = { ex => - logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) - StepResult.Failure - } - - val queryParam = queryRequest.queryParam - fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs => - val (startOffset, len) = queryParam.label.schemaVersion match { - case HBaseType.VERSION4 => - val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset - (offset, queryParam.limit) - case _ => (0, kvs.length) - } - - toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len) - } - } - - val queryParam = queryRequest.queryParam - val cacheTTL = queryParam.cacheTTLInMillis - /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ + * since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase. + * to fix version conflict, make this as lazy val for clients that don't require hbase client. + */ + val client = AsynchbaseStorage.makeClient(config) + val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0") + val clients = Seq(client, clientWithFlush) - val edge = toRequestEdge(queryRequest, parentEdges) - val request = buildRequest(queryRequest, edge) + override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients) - val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) - val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes) + override val mutator: StorageWritable = new AsynchbaseStorageWritable(client, clientWithFlush) - if (cacheTTL <= 0) fetchInner(request) - else { - val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey) + override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph) -// val cacheKeyBytes = toCacheKeyBytes(request) - val cacheKey = queryParam.toCacheKey(cacheKeyBytes) - futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) - } - } - - override def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { - val defers: Seq[Deferred[StepResult]] = for { - queryRequest <- queryRequests - } yield { - val queryOption = queryRequest.query.queryOption - val queryParam = queryRequest.queryParam - val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent - val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil - fetch(queryRequest, isInnerCall = false, parentEdges) - } - - val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers) - grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] => - queryResults.toSeq - }.toFuture(emptyStepResult) - } - - - def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] = { - val edge = toRequestEdge(request, Nil) - fetchKeyValues(buildRequest(request, edge)) - } - - - def fetchVertexKeyValues(request: AsyncRPC): Future[Seq[SKeyValue]] = fetchKeyValues(request) + override val fetcher: StorageReadable[AsyncRPC] = new AsynchbaseStorageReadable(graph, config, client, serDe, io) - /** - * when withWait is given, we use client with flushInterval set to 0. - * if we are not using this, then we are adding extra wait time as much as flushInterval in worst case. - * - * @param edges - * @param withWait - * @return - */ - override def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { - - val _client = client(withWait) - val defers: Seq[Deferred[(Boolean, Long, Long)]] = for { - edge <- edges - } yield { - val futures: List[Deferred[(Boolean, Long, Long)]] = for { - relEdge <- edge.relatedEdges - edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) - } yield { - val countWithTs = edge.propertyValueInner(LabelMeta.count) - val countVal = countWithTs.innerVal.toString().toLong - val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head - val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) - val fallbackFn: (Exception => (Boolean, Long, Long)) = { ex => - logger.error(s"mutation failed. $request", ex) - (false, -1L, -1L) - } - val defer = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long => - (true, resultCount.longValue(), countVal) - } - if (withWait) defer - else Deferred.fromResult((true, -1L, -1L)) - } - - val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.group(futures) - grouped.map(new util.ArrayList[(Boolean, Long, Long)]()) { resultLs => resultLs.head } - } - - val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.groupInOrder(defers) - grouped.toFuture(new util.ArrayList[(Boolean, Long, Long)]()).map(_.toSeq) - } - - - override def flush(): Unit = clients.foreach { client => - super.flush() - val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS) - Await.result(client.flush().toFuture(new AnyRef), timeout) - } + // val hbaseExecutor: ExecutorService = + // if (config.getString("hbase.zookeeper.quorum") == "localhost") + // AsynchbaseStorage.initLocalHBase(config) + // else + // null - override def shutdown(): Unit = { - flush() - clients.foreach { client => - AsynchbaseStorage.shutdown(client) - } - if (hbaseExecutor != null) { - hbaseExecutor.shutdown() - hbaseExecutor.awaitTermination(1, TimeUnit.MINUTES) - } - } - - override def createTable(_zkAddr: String, - tableName: String, - cfs: List[String], - regionMultiplier: Int, - ttl: Option[Int], - compressionAlgorithm: String, - replicationScopeOpt: Option[Int] = None, - totalRegionCount: Option[Int] = None): Unit = { - /* TODO: Decide if we will allow each app server to connect to multiple hbase cluster */ - for { - zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq - } { - logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") - withAdmin(zkAddr) { admin => - val regionCount = totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * regionMultiplier) - try { - if (!admin.tableExists(TableName.valueOf(tableName))) { - val desc = new HTableDescriptor(TableName.valueOf(tableName)) - desc.setDurability(Durability.ASYNC_WAL) - for (cf <- cfs) { - val columnDesc = new HColumnDescriptor(cf) - .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) - .setBloomFilterType(BloomType.ROW) - .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) - .setMaxVersions(1) - .setTimeToLive(2147483647) - .setMinVersions(0) - .setBlocksize(32768) - .setBlockCacheEnabled(true) - // FIXME: For test!! - .setInMemory(true) - if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) - if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get) - desc.addFamily(columnDesc) - } - - if (regionCount <= 1) admin.createTable(desc) - else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) - } else { - logger.info(s"$zkAddr, $tableName, $cfs already exist.") - } - } catch { - case e: Throwable => - logger.error(s"$zkAddr, $tableName failed with $e", e) - throw e - } - } - } - } - - override def truncateTable(zkAddr: String, tableNameStr: String): Unit = { - withAdmin(zkAddr) { admin => - val tableName = TableName.valueOf(tableNameStr) - if (!Try(admin.tableExists(tableName)).getOrElse(false)) { - logger.info(s"No table to truncate ${tableNameStr}") - return - } - - Try(admin.isTableDisabled(tableName)).map { - case true => - logger.info(s"${tableNameStr} is already disabled.") - - case false => - logger.info(s"Before disabling to trucate ${tableNameStr}") - Try(admin.disableTable(tableName)).recover { - case NonFatal(e) => - logger.info(s"Failed to disable ${tableNameStr}: ${e}") - } - logger.info(s"After disabling to trucate ${tableNameStr}") - } - - logger.info(s"Before truncating ${tableNameStr}") - Try(admin.truncateTable(tableName, true)).recover { - case NonFatal(e) => - logger.info(s"Failed to truncate ${tableNameStr}: ${e}") - } - logger.info(s"After truncating ${tableNameStr}") - Try(admin.close()).recover { - case NonFatal(e) => - logger.info(s"Failed to close admin ${tableNameStr}: ${e}") - } - Try(admin.getConnection.close()).recover { - case NonFatal(e) => - logger.info(s"Failed to close connection ${tableNameStr}: ${e}") - } - } - } - - override def deleteTable(zkAddr: String, tableNameStr: String): Unit = { - withAdmin(zkAddr) { admin => - val tableName = TableName.valueOf(tableNameStr) - if (!admin.tableExists(tableName)) { - return - } - if (admin.isTableEnabled(tableName)) { - admin.disableTable(tableName) - } - admin.deleteTable(tableName) - } - } - - /** Asynchbase implementation override default getVertices to use future Cache */ - override def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { - def fromResult(kvs: Seq[SKeyValue], - version: String): Option[S2Vertex] = { - if (kvs.isEmpty) None - else vertexDeserializer.fromKeyValues(kvs, None) - } - - val futures = vertices.map { vertex => - val kvs = vertexSerializer(vertex).toKeyValues - val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) - // get.setTimeout(this.singleGetTimeout.toShort) - get.setFailfast(true) - get.maxVersions(1) - - fetchVertexKeyValues(Left(get)).map { kvs => - fromResult(kvs, vertex.serviceColumn.schemaVersion) - } -// val cacheKey = MurmurHash3.stringHash(get.toString) -// vertexCache.getOrElseUpdate(cacheKey, cacheTTL = -1)(fetchVertexKeyValues(Left(get))).map { kvs => -// fromResult(kvs, vertex.serviceColumn.schemaVersion) -// } - } - - Future.sequence(futures).map { result => result.toList.flatten } - } - - //TODO: Limited to 100000 edges per hbase table. fix this later. - override def fetchEdgesAll(): Future[Seq[S2Edge]] = { - val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => - val distinctLabels = labels.toSet - val scan = AsynchbasePatcher.newScanner(client, hTableName) - scan.setFamily(Serializable.edgeCf) - scan.setMaxVersions(1) - - scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { - case null => Seq.empty - case kvsLs => - kvsLs.flatMap { kvs => - kvs.flatMap { kv => - val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) - - indexEdgeDeserializer.fromKeyValues(Seq(kv), None) - .filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree) - } - } - } - } - - Future.sequence(futures).map(_.flatten) - } - - override def fetchVerticesAll(): Future[Seq[S2Vertex]] = { - val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => - val distinctColumns = columns.toSet - val scan = AsynchbasePatcher.newScanner(client, hTableName) - scan.setFamily(Serializable.vertexCf) - scan.setMaxVersions(1) - - scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { - case null => Seq.empty - case kvsLs => - kvsLs.flatMap { kvs => - vertexDeserializer.fromKeyValues(kvs, None) - .filter(v => distinctColumns(v.serviceColumn)) - } - } - } - Future.sequence(futures).map(_.flatten) - } - - class V4ResultHandler(scanner: Scanner, defer: Deferred[util.ArrayList[KeyValue]], offset: Int, limit : Int) extends Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] { - val results = new util.ArrayList[KeyValue]() - var offsetCount = 0 - - override def call(kvsLs: util.ArrayList[util.ArrayList[KeyValue]]): Object = { - try { - if (kvsLs == null) { - defer.callback(results) - Try(scanner.close()) - } else { - val curRet = new util.ArrayList[KeyValue]() - kvsLs.foreach(curRet.addAll(_)) - val prevOffset = offsetCount - offsetCount += curRet.size() - - val nextRet = if(offsetCount > offset){ - if(prevOffset < offset ) { - curRet.subList(offset - prevOffset, curRet.size()) - } else{ - curRet - } - } else{ - emptyKeyValues - } - - val needCount = limit - results.size() - if (needCount >= nextRet.size()) { - results.addAll(nextRet) - } else { - results.addAll(nextRet.subList(0, needCount)) - } - - if (results.size() < limit) { - scanner.nextRows().addCallback(this) - } else { - defer.callback(results) - Try(scanner.close()) - } - } - } catch{ - case ex: Exception => - logger.error(s"fetchKeyValuesInner failed.", ex) - defer.callback(ex) - Try(scanner.close()) - } - } - } - - /** - * Private Methods which is specific to Asynchbase implementation. - */ - private def fetchKeyValuesInner(rpc: AsyncRPC): Deferred[util.ArrayList[KeyValue]] = { - rpc match { - case Left(get) => client.get(get) - case Right(ScanWithRange(scanner, offset, limit)) => - val deferred = new Deferred[util.ArrayList[KeyValue]]() - scanner.nextRows().addCallback(new V4ResultHandler(scanner, deferred, offset, limit)) - deferred - case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc")) - } - } - - private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = { - /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ - hbaseRpc match { - case Left(getRequest) => getRequest.key - case Right(ScanWithRange(scanner, offset, limit)) => - Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit))) - case _ => - logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc") - throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc") - } - } - - private def getSecureClusterAdmin(zkAddr: String) = { - val jaas = config.getString("java.security.auth.login.config") - val krb5Conf = config.getString("java.security.krb5.conf") - val realm = config.getString("realm") - val principal = config.getString("principal") - val keytab = config.getString("keytab") - - System.setProperty("java.security.auth.login.config", jaas) - System.setProperty("java.security.krb5.conf", krb5Conf) - // System.setProperty("sun.security.krb5.debug", "true") - // System.setProperty("sun.security.spnego.debug", "true") - val conf = new Configuration(true) - val hConf = HBaseConfiguration.create(conf) - - hConf.set("hbase.zookeeper.quorum", zkAddr) - - hConf.set("hadoop.security.authentication", "Kerberos") - hConf.set("hbase.security.authentication", "Kerberos") - hConf.set("hbase.master.kerberos.principal", "hbase/_HOST@" + realm) - hConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@" + realm) - - System.out.println("Connecting secure cluster, using keytab\n") - UserGroupInformation.setConfiguration(hConf) - UserGroupInformation.loginUserFromKeytab(principal, keytab) - val currentUser = UserGroupInformation.getCurrentUser() - System.out.println("current user : " + currentUser + "\n") - - // get table list - val conn = ConnectionFactory.createConnection(hConf) - conn.getAdmin - } - - private def withAdmin(zkAddr: String)(op: Admin => Unit): Unit = { - val admin = getAdmin(zkAddr) - try { - op(admin) - } finally { - admin.close() - admin.getConnection.close() - } - } - /** - * following configuration need to come together to use secured hbase cluster. - * 1. set hbase.security.auth.enable = true - * 2. set file path to jaas file java.security.auth.login.config - * 3. set file path to kerberos file java.security.krb5.conf - * 4. set realm - * 5. set principal - * 6. set file path to keytab - * @param zkAddr - * @return - */ - private def getAdmin(zkAddr: String) = { - if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) { - getSecureClusterAdmin(zkAddr) - } else { - val conf = HBaseConfiguration.create() - conf.set("hbase.zookeeper.quorum", zkAddr) - val conn = ConnectionFactory.createConnection(conf) - conn.getAdmin - } - } - - private def enableTable(zkAddr: String, tableName: String) = { - withAdmin(zkAddr) { admin => - admin.enableTable(TableName.valueOf(tableName)) - } - } - - private def disableTable(zkAddr: String, tableName: String) = { - withAdmin(zkAddr) { admin => - admin.disableTable(TableName.valueOf(tableName)) - } - } - - private def dropTable(zkAddr: String, tableName: String) = { - withAdmin(zkAddr) { admin => - admin.disableTable(TableName.valueOf(tableName)) - admin.deleteTable(TableName.valueOf(tableName)) - } - } - - private def getStartKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount)) - } - - private def getEndKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala new file mode 100644 index 0000000..c55c6c7 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageManagement.scala @@ -0,0 +1,263 @@ +package org.apache.s2graph.core.storage.hbase + +import java.util.concurrent.{Executors, TimeUnit} + +import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} +import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability} +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.regionserver.BloomType +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.security.UserGroupInformation +import org.apache.s2graph.core.storage.StorageManagement +import org.apache.s2graph.core.utils.{Extensions, logger} +import org.hbase.async.HBaseClient + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, duration} +import scala.util.Try +import scala.util.control.NonFatal + +object AsynchbaseStorageManagement { + /* Secure cluster */ + val SecurityAuthEnabled = "hbase.security.auth.enable" + val Jaas = "java.security.auth.login.config" + val Krb5Conf = "java.security.krb5.conf" + val Realm = "realm" + val Principal = "principal" + val Keytab = "keytab" + val HadoopAuthentication = "hadoop.security.authentication" + val HBaseAuthentication = "hbase.security.authentication" + val MasterKerberosPrincipal = "hbase.master.kerberos.principal" + val RegionServerKerberosPrincipal = "hbase.regionserver.kerberos.principal" + + + val DefaultCreateTableOptions = Map( + "hbase.zookeeper.quorum" -> "localhost" + ) +} + +class AsynchbaseStorageManagement(val config: Config, val clients: Seq[HBaseClient]) extends StorageManagement { + import org.apache.s2graph.core.Management._ + import AsynchbaseStorageManagement._ + import Extensions.DeferOps + + /** + * Asynchbase client setup. + * note that we need two client, one for bulk(withWait=false) and another for withWait=true + */ + private val clientFlushInterval = config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort + + /** + * this method need to be called when client shutdown. this is responsible to cleanUp the resources + * such as client into storage. + */ + override def flush(): Unit = clients.foreach { client => + implicit val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) + + val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS) + Await.result(client.flush().toFuture(new AnyRef), timeout) + } + + def getOption[T](config: Config, key: String): Option[T] = { + import scala.util._ + Try { config.getAnyRef(key).asInstanceOf[T] }.toOption + } + /** + * create table on storage. + * if storage implementation does not support namespace or table, then there is nothing to be done + * + * @param config + */ + override def createTable(config: Config, tableNameStr: String): Unit = { + val zkAddr = config.getString(ZookeeperQuorum) + + withAdmin(config) { admin => + val regionMultiplier = getOption[Int](config, RegionMultiplier).getOrElse(0) + val regionCount = getOption[Int](config, TotalRegionCount).getOrElse(admin.getClusterStatus.getServersSize * regionMultiplier) + val cfs = getOption[Seq[String]](config, ColumnFamilies).getOrElse(DefaultColumnFamilies) + val compressionAlgorithm = getOption[String](config, CompressionAlgorithm).getOrElse(DefaultCompressionAlgorithm) + val ttl = getOption[Int](config, Ttl) + val replicationScoreOpt = getOption[Int](config, ReplicationScope) + + val tableName = TableName.valueOf(tableNameStr) + try { + if (!admin.tableExists(tableName)) { + val desc = new HTableDescriptor(tableName) + desc.setDurability(Durability.ASYNC_WAL) + for (cf <- cfs) { + val columnDesc = new HColumnDescriptor(cf) + .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) + .setBloomFilterType(BloomType.ROW) + .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) + .setMaxVersions(1) + .setTimeToLive(2147483647) + .setMinVersions(0) + .setBlocksize(32768) + .setBlockCacheEnabled(true) + // FIXME: For test!! + .setInMemory(true) + + ttl.foreach(columnDesc.setTimeToLive(_)) + replicationScoreOpt.foreach(columnDesc.setScope(_)) + + desc.addFamily(columnDesc) + } + + if (regionCount <= 1) admin.createTable(desc) + else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) + } else { + logger.info(s"$zkAddr, $tableName, $cfs already exist.") + } + } catch { + case e: Throwable => + logger.error(s"$zkAddr, $tableName failed with $e", e) + throw e + } + } + } + + /** + * + * @param config + * @param tableNameStr + */ + override def truncateTable(config: Config, tableNameStr: String): Unit = { + withAdmin(config) { admin => + val tableName = TableName.valueOf(tableNameStr) + if (!Try(admin.tableExists(tableName)).getOrElse(false)) { + logger.info(s"No table to truncate ${tableNameStr}") + return + } + + Try(admin.isTableDisabled(tableName)).map { + case true => + logger.info(s"${tableNameStr} is already disabled.") + + case false => + logger.info(s"Before disabling to trucate ${tableNameStr}") + Try(admin.disableTable(tableName)).recover { + case NonFatal(e) => + logger.info(s"Failed to disable ${tableNameStr}: ${e}") + } + logger.info(s"After disabling to trucate ${tableNameStr}") + } + + logger.info(s"Before truncating ${tableNameStr}") + Try(admin.truncateTable(tableName, true)).recover { + case NonFatal(e) => + logger.info(s"Failed to truncate ${tableNameStr}: ${e}") + } + logger.info(s"After truncating ${tableNameStr}") + Try(admin.close()).recover { + case NonFatal(e) => + logger.info(s"Failed to close admin ${tableNameStr}: ${e}") + } + Try(admin.getConnection.close()).recover { + case NonFatal(e) => + logger.info(s"Failed to close connection ${tableNameStr}: ${e}") + } + } + } + + /** + * + * @param config + * @param tableNameStr + */ + override def deleteTable(config: Config, tableNameStr: String): Unit = { + withAdmin(config) { admin => + val tableName = TableName.valueOf(tableNameStr) + if (!admin.tableExists(tableName)) { + return + } + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName) + } + admin.deleteTable(tableName) + } + } + + /** + * + */ + override def shutdown(): Unit = { + flush() + clients.foreach { client => + AsynchbaseStorage.shutdown(client) + } + } + + + + private def getSecureClusterAdmin(config: Config) = { + val zkAddr = config.getString(ZookeeperQuorum) + val realm = config.getString(Realm) + val principal = config.getString(Principal) + val keytab = config.getString(Keytab) + + System.setProperty(Jaas, config.getString(Jaas)) + System.setProperty(Krb5Conf, config.getString(Krb5Conf)) + + + val conf = new Configuration(true) + val hConf = HBaseConfiguration.create(conf) + + hConf.set(ZookeeperQuorum, zkAddr) + + hConf.set(HadoopAuthentication, "Kerberos") + hConf.set(HBaseAuthentication, "Kerberos") + hConf.set(MasterKerberosPrincipal, "hbase/_HOST@" + realm) + hConf.set(RegionServerKerberosPrincipal, "hbase/_HOST@" + realm) + + System.out.println("Connecting secure cluster, using keytab\n") + UserGroupInformation.setConfiguration(hConf) + UserGroupInformation.loginUserFromKeytab(principal, keytab) + val currentUser = UserGroupInformation.getCurrentUser() + System.out.println("current user : " + currentUser + "\n") + + // get table list + val conn = ConnectionFactory.createConnection(hConf) + conn.getAdmin + } + + private def withAdmin(config: Config)(op: Admin => Unit): Unit = { + val admin = getAdmin(config) + try { + op(admin) + } finally { + admin.close() + admin.getConnection.close() + } + } + /** + * following configuration need to come together to use secured hbase cluster. + * 1. set hbase.security.auth.enable = true + * 2. set file path to jaas file java.security.auth.login.config + * 3. set file path to kerberos file java.security.krb5.conf + * 4. set realm + * 5. set principal + * 6. set file path to keytab + */ + private def getAdmin(config: Config) = { + if (config.hasPath(SecurityAuthEnabled) && config.getBoolean(SecurityAuthEnabled)) { + getSecureClusterAdmin(config) + } else { + val zkAddr = config.getString(ZookeeperQuorum) + val conf = HBaseConfiguration.create() + conf.set(ZookeeperQuorum, zkAddr) + val conn = ConnectionFactory.createConnection(conf) + conn.getAdmin + } + } + + private def getStartKey(regionCount: Int): Array[Byte] = { + Bytes.toBytes((Int.MaxValue / regionCount)) + } + + private def getEndKey(regionCount: Int): Array[Byte] = { + Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala new file mode 100644 index 0000000..4ef95b8 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -0,0 +1,335 @@ +package org.apache.s2graph.core.storage.hbase + +import java.util +import java.util.Base64 + +import com.stumbleupon.async.Deferred +import com.typesafe.config.Config +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} +import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.storage.serde._ +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange} +import org.apache.s2graph.core.types.{HBaseType, VertexId} +import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger} +import org.hbase.async.FilterList.Operator.MUST_PASS_ALL +import org.hbase.async._ + +import scala.collection.JavaConversions._ +import scala.concurrent.{ExecutionContext, Future} + +class AsynchbaseStorageReadable(val graph: S2Graph, + val config: Config, + val client: HBaseClient, + val serDe: StorageSerDe, + override val io: StorageIO) extends StorageReadable[AsyncRPC] { + import Extensions.DeferOps + import CanDefer._ + + private val emptyKeyValues = new util.ArrayList[KeyValue]() + private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]() + private val emptyStepResult = new util.ArrayList[StepResult]() + + /** Future Cache to squash request */ + lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true) + /** v4 max next row size */ + private val v4_max_num_rows = 10000 + private def getV4MaxNumRows(limit : Int): Int = { + if (limit < v4_max_num_rows) limit + else v4_max_num_rows + } + + /** + * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues. + * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build + * client request(GetRequest, Scanner) based on user provided query. + * + * @param queryRequest + * @return + */ + override def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = { + import Serializable._ + val queryParam = queryRequest.queryParam + val label = queryParam.label + + val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) { + val snapshotEdge = edge.toSnapshotEdge + serDe.snapshotEdgeSerializer(snapshotEdge) + } else { + val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq) + serDe.indexEdgeSerializer(indexEdge) + } + + val rowKey = serializer.toRowKey + val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue)) + + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) + + label.schemaVersion match { + case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty => + val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName) + scanner.setFamily(edgeCf) + + /* + * TODO: remove this part. + */ + val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq) + val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) + + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val (startKey, stopKey) = + if (queryParam.intervalOpt.isDefined) { + // interval is set. + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => Bytes.add(baseKey, intervalMaxBytes) + } + (_startKey , Bytes.add(baseKey, intervalMinBytes)) + } else { + /* + * note: since propsToBytes encode size of property map at first byte, we are sure about max value here + */ + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => baseKey + } + (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) + } + + scanner.setStartKey(startKey) + scanner.setStopKey(stopKey) + + if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") + + scanner.setMaxVersions(1) + // TODO: exclusive condition innerOffset with cursorOpt + if (queryParam.cursorOpt.isDefined) { + scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit)) + } else { + scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit)) + } + scanner.setMaxTimestamp(maxTs) + scanner.setMinTimestamp(minTs) + scanner.setRpcTimeout(queryParam.rpcTimeout) + + // SET option for this rpc properly. + if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit)) + else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit)) + + case _ => + val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) { + new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier) + } else { + new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf) + } + + get.maxVersions(1) + get.setFailfast(true) + get.setMinTimestamp(minTs) + get.setMaxTimestamp(maxTs) + get.setTimeout(queryParam.rpcTimeout) + + val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset) + val columnRangeFilterOpt = queryParam.intervalOpt.map { interval => + new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true) + } + get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL)) + Left(get) + } + } + + /** + * + * @param queryRequest + * @param vertex + * @return + */ + override def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex) = { + val kvs = serDe.vertexSerializer(vertex).toKeyValues + val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf) + // get.setTimeout(this.singleGetTimeout.toShort) + get.setFailfast(true) + get.maxVersions(1) + Left(get) + } + + /** + * responsible to fire parallel fetch call into storage and create future that will return merged result. + * + * @param queryRequests + * @param prevStepEdges + * @return + */ + override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext) = { + val defers: Seq[Deferred[StepResult]] = for { + queryRequest <- queryRequests + } yield { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent + val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil + fetch(queryRequest, isInnerCall = false, parentEdges) + } + + val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers) + grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] => + queryResults.toSeq + }.toFuture(emptyStepResult) + } + + override def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = { + val defer = fetchKeyValuesInner(rpc) + defer.toFuture(emptyKeyValues).map { kvsArr => + kvsArr.map { kv => + implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) + } + } + } + + override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = { + val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => + val distinctLabels = labels.toSet + val scan = AsynchbasePatcher.newScanner(client, hTableName) + scan.setFamily(Serializable.edgeCf) + scan.setMaxVersions(1) + + scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { + case null => Seq.empty + case kvsLs => + kvsLs.flatMap { kvs => + kvs.flatMap { kv => + val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) + + serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) + .filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree) + } + } + } + } + + Future.sequence(futures).map(_.flatten) + } + + override def fetchVerticesAll()(implicit ec: ExecutionContext) = { + val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) => + val distinctColumns = columns.toSet + val scan = AsynchbasePatcher.newScanner(client, hTableName) + scan.setFamily(Serializable.vertexCf) + scan.setMaxVersions(1) + + scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { + case null => Seq.empty + case kvsLs => + kvsLs.flatMap { kvs => + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs, None) + .filter(v => distinctColumns(v.serviceColumn)) + } + } + } + Future.sequence(futures).map(_.flatten) + } + + + /** + * we are using future cache to squash requests into same key on storage. + * + * @param queryRequest + * @param isInnerCall + * @param parentEdges + * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future. + * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback + */ + private def fetch(queryRequest: QueryRequest, + isInnerCall: Boolean, + parentEdges: Seq[EdgeWithScore])(implicit ec: ExecutionContext): Deferred[StepResult] = { + + def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = { + val prevStepScore = queryRequest.prevStepScore + val fallbackFn: (Exception => StepResult) = { ex => + logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) + StepResult.Failure + } + + val queryParam = queryRequest.queryParam + fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs => + val (startOffset, len) = queryParam.label.schemaVersion match { + case HBaseType.VERSION4 => + val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset + (offset, queryParam.limit) + case _ => (0, kvs.length) + } + + io.toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len) + } + } + + val queryParam = queryRequest.queryParam + val cacheTTL = queryParam.cacheTTLInMillis + /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ + + val edge = Storage.toRequestEdge(graph)(queryRequest, parentEdges) + val request = buildRequest(queryRequest, edge) + + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) + val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes) + + if (cacheTTL <= 0) fetchInner(request) + else { + val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey) + + // val cacheKeyBytes = toCacheKeyBytes(request) + val cacheKey = queryParam.toCacheKey(cacheKeyBytes) + futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) + } + } + + /** + * Private Methods which is specific to Asynchbase implementation. + */ + private def fetchKeyValuesInner(rpc: AsyncRPC)(implicit ec: ExecutionContext): Deferred[util.ArrayList[KeyValue]] = { + rpc match { + case Left(get) => client.get(get) + case Right(ScanWithRange(scanner, offset, limit)) => + val fallbackFn: (Exception => util.ArrayList[KeyValue]) = { ex => + logger.error(s"fetchKeyValuesInner failed.", ex) + scanner.close() + emptyKeyValues + } + + scanner.nextRows().mapWithFallback(new util.ArrayList[util.ArrayList[KeyValue]]())(fallbackFn) { kvsLs => + val ls = new util.ArrayList[KeyValue] + if (kvsLs == null) { + } else { + kvsLs.foreach { kvs => + if (kvs != null) kvs.foreach { kv => ls.add(kv) } + else { + + } + } + } + + scanner.close() + val toIndex = Math.min(ls.size(), offset + limit) + new util.ArrayList[KeyValue](ls.subList(offset, toIndex)) + } + case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc")) + } + } + + private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = { + /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ + hbaseRpc match { + case Left(getRequest) => getRequest.key + case Right(ScanWithRange(scanner, offset, limit)) => + Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit))) + case _ => + logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc") + throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala new file mode 100644 index 0000000..ab1ff19 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageSerDe.scala @@ -0,0 +1,68 @@ +package org.apache.s2graph.core.storage.hbase + +import org.apache.s2graph.core.storage.serde.Deserializable +import org.apache.s2graph.core.{IndexEdge, S2Graph, S2Vertex, SnapshotEdge} +import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe, serde} + +class AsynchbaseStorageSerDe(val graph: S2Graph) extends StorageSerDe { + import org.apache.s2graph.core.types.HBaseType._ + + /** + * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue] + * so we can store this kvs. + * + * @param snapshotEdge : snapshotEdge to serialize + * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue] + */ + override def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = { + snapshotEdge.schemaVer match { + // case VERSION1 | + case VERSION2 => new serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge) + case VERSION3 | VERSION4 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge) + case _ => throw new RuntimeException(s"not supported version: ${snapshotEdge.schemaVer}") + } + } + + /** + * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue] + * + * @param indexEdge : indexEdge to serialize + * @return serializer implementation + */ + override def indexEdgeSerializer(indexEdge: IndexEdge) = { + indexEdge.schemaVer match { + // case VERSION1 + case VERSION2 | VERSION3 => new serde.indexedge.wide.IndexEdgeSerializable(indexEdge) + case VERSION4 => new serde.indexedge.tall.IndexEdgeSerializable(indexEdge) + case _ => throw new RuntimeException(s"not supported version: ${indexEdge.schemaVer}") + } + } + + /** + * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue] + * + * @param vertex : vertex to serialize + * @return serializer implementation + */ + override def vertexSerializer(vertex: S2Vertex) = new serde.vertex.wide.VertexSerializable(vertex) + + /** + * create deserializer that can parse stored CanSKeyValue into snapshotEdge. + * note that each storage implementation should implement implicit type class + * to convert storage dependent dataType into common SKeyValue type by implementing CanSKeyValue + * + * ex) Asynchbase use it's KeyValue class and CanSKeyValue object has implicit type conversion method. + * if any storaage use different class to represent stored byte array, + * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue. + **/ + private val snapshotEdgeDeserializable = new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph) + override def snapshotEdgeDeserializer(schemaVer: String) = snapshotEdgeDeserializable + + /** create deserializer that can parse stored CanSKeyValue into indexEdge. */ + private val indexEdgeDeserializer = new serde.indexedge.tall.IndexEdgeDeserializable(graph) + override def indexEdgeDeserializer(schemaVer: String) = indexEdgeDeserializer + + /** create deserializer that can parser stored CanSKeyValue into vertex. */ + private val vertexDeserializer = new serde.vertex.wide.VertexDeserializable(graph) + override def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex] = vertexDeserializer +}
