http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala deleted file mode 100644 index d29ccce..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.rocks - -import java.util.concurrent.locks.ReentrantLock - -import com.google.common.cache.{Cache, LoadingCache} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.S2GraphLike -import org.apache.s2graph.core.storage._ -import org.apache.s2graph.core.utils.logger -import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions} - -import scala.concurrent.{ExecutionContext, Future} - -class RocksStorageWritable(val graph: S2GraphLike, - val serDe: StorageSerDe, - val reader: StorageReadable, - val db: RocksDB, - val vdb: RocksDB, - val lockMap: LoadingCache[String, ReentrantLock]) extends DefaultOptimisticMutator(graph, serDe, reader) { - - override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = { - if (kvs.isEmpty) { - Future.successful(MutateResponse.Success) - } else { - val ret = { - val (kvsV, kvsE) = kvs.partition(kv => Bytes.equals(kv.cf, SKeyValue.VertexCf)) - val writeBatchV = buildWriteBatch(kvsV) - val writeBatchE = buildWriteBatch(kvsE) - val writeOptions = new WriteOptions - try { - vdb.write(writeOptions, writeBatchV) - db.write(writeOptions, writeBatchE) - true - } catch { - case e: Exception => - logger.error(s"writeAsyncSimple failed.", e) - false - } finally { - writeBatchV.close() - writeBatchE.close() - writeOptions.close() - } - } - - Future.successful(new MutateResponse(ret)) - } - } - - - override def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext) = { - def op = { - val writeOptions = new WriteOptions - try { - val fetchedValue = db.get(requestKeyValue.row) - val innerRet = expectedOpt match { - case None => - if (fetchedValue == null) { - - db.put(writeOptions, requestKeyValue.row, requestKeyValue.value) - true - } else { - false - } - case Some(kv) => - if (fetchedValue == null) { - false - } else { - if (Bytes.compareTo(fetchedValue, kv.value) == 0) { - db.put(writeOptions, requestKeyValue.row, requestKeyValue.value) - true - } else { - false - } - } - } - - Future.successful(new MutateResponse(innerRet)) - } catch { - case e: RocksDBException => - logger.error(s"Write lock failed", e) - Future.successful(MutateResponse.Failure) - } finally { - writeOptions.close() - } - } - - withLock(requestKeyValue.row)(op) - } - - private def buildWriteBatch(kvs: Seq[SKeyValue]): WriteBatch = { - val writeBatch = new WriteBatch() - kvs.foreach { kv => - kv.operation match { - case SKeyValue.Put => writeBatch.put(kv.row, kv.value) - case SKeyValue.Delete => writeBatch.remove(kv.row) - case SKeyValue.Increment => writeBatch.merge(kv.row, kv.value) - case _ => throw new RuntimeException(s"not supported rpc operation. ${kv.operation}") - } - } - writeBatch - } - - private def withLock[A](key: Array[Byte])(op: => A): A = { - val lockKey = Bytes.toString(key) - val lock = lockMap.get(lockKey) - - try { - lock.lock - op - } finally { - lock.unlock() - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala new file mode 100644 index 0000000..20acfaa --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.rocks + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.schema.ServiceColumn +import org.apache.s2graph.core.{S2GraphLike, S2VertexLike, VertexBulkFetcher} +import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.HBaseType +import org.rocksdb.RocksDB + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +class RocksVertexBulkFetcher(val graph: S2GraphLike, + val config: Config, + val db: RocksDB, + val vdb: RocksDB, + val serDe: StorageSerDe, + val io: StorageIO) extends VertexBulkFetcher { + import RocksStorage._ + + override def fetchVerticesAll()(implicit ec: ExecutionContext) = { + import scala.collection.mutable + + val vertices = new ArrayBuffer[S2VertexLike]() + ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) => + val distinctColumns = columns.toSet + + val iter = vdb.newIterator() + val buffer = mutable.ListBuffer.empty[SKeyValue] + var oldVertexIdBytes = Array.empty[Byte] + var minusPos = 0 + + try { + iter.seekToFirst() + while (iter.isValid) { + val row = iter.key() + if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) { + if (buffer.nonEmpty) + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) + .filter(v => distinctColumns(v.serviceColumn)) + .foreach { vertex => + vertices += vertex + } + + oldVertexIdBytes = row + minusPos = 1 + buffer.clear() + } + val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis()) + buffer += kv + + iter.next() + } + if (buffer.nonEmpty) + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) + .filter(v => distinctColumns(v.serviceColumn)) + .foreach { vertex => + vertices += vertex + } + + } finally { + iter.close() + } + } + + Future.successful(vertices) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala new file mode 100644 index 0000000..6becd98 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.rocks + +import com.typesafe.config.Config +import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} +import org.rocksdb.RocksDB + +import scala.concurrent.{ExecutionContext, Future} + +class RocksVertexFetcher(val graph: S2GraphLike, + val config: Config, + val db: RocksDB, + val vdb: RocksDB, + val serDe: StorageSerDe, + val io: StorageIO) extends VertexFetcher { + private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { + val rpc = RocksStorage.buildRequest(queryRequest, vertex) + + RocksStorage.fetchKeyValues(vdb, db, rpc) + } + + override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { + def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = { + if (kvs.isEmpty) Nil + else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq + } + + val futures = vertices.map { vertex => + val queryParam = QueryParam.Empty + val q = Query.toQuery(Seq(vertex), Seq(queryParam)) + val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) + + fetchKeyValues(queryRequest, vertex).map { kvs => + fromResult(kvs, vertex.serviceColumn.schemaVersion) + } recoverWith { + case ex: Throwable => Future.successful(Nil) + } + } + + Future.sequence(futures).map(_.flatten) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala deleted file mode 100644 index 8cd32d4..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.s2graph.core.storage -import org.apache.s2graph.core._ -import org.apache.s2graph.core.schema.LabelMeta -import org.apache.s2graph.core.utils.logger - -import scala.concurrent.{ExecutionContext, Future} - -abstract class DefaultOptimisticMutator(graph: S2GraphLike, - serDe: StorageSerDe, - reader: StorageReadable) extends OptimisticMutator { - val fetcher = reader - - lazy val io: StorageIO = new StorageIO(graph, serDe) - lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, this, reader) - -// private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = -// mutator.writeToStorage(cluster, kvs, withWait) - - def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, - requestTs: Long, - retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = { - if (stepInnerResult.isEmpty) Future.successful(true) - else { - val head = stepInnerResult.edgeWithScores.head - val zkQuorum = head.edge.innerLabel.hbaseZkAddr - val futures = for { - edgeWithScore <- stepInnerResult.edgeWithScores - } yield { - val edge = edgeWithScore.edge - - val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - - val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => - serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - io.buildIncrementsAsync(indexEdge, -1L) - } - - /* reverted direction */ - val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => - serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - io.buildIncrementsAsync(indexEdge, -1L) - } - - val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations - - writeToStorage(zkQuorum, mutations, withWait = true) - } - - Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } - } - } - - def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { - if (vertex.op == GraphUtil.operations("delete")) { - writeToStorage(zkQuorum, - serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) - } else if (vertex.op == GraphUtil.operations("deleteAll")) { - logger.info(s"deleteAll for vertex is truncated. $vertex") - Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time - } else { - writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait) - } - } - - def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = { - val mutations = _edges.flatMap { edge => - val (_, edgeUpdate) = - if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) - else S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) - - if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) - io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - } - - writeToStorage(zkQuorum, mutations, withWait).map { ret => - _edges.zipWithIndex.map { case (edge, idx) => - idx -> ret.isSuccess - } - } - } - - def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { - def mutateEdgesInner(edges: Seq[S2EdgeLike], - checkConsistency: Boolean, - withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { - assert(edges.nonEmpty) - // TODO:: remove after code review: unreachable code - if (!checkConsistency) { - - val futures = edges.map { edge => - val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) - val mutations = - io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - - if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) - - writeToStorage(zkQuorum, mutations, withWait) - } - Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } - } else { - fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => - conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) - } - } - } - - val edgeWithIdxs = _edges.zipWithIndex - val grouped = edgeWithIdxs.groupBy { case (edge, idx) => - (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) - } toSeq - - val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - // After deleteAll, process others - val mutateEdgeFutures = edges.toList match { - case head :: tail => - val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait) - - //TODO: decide what we will do on failure on vertex put - val puts = io.buildVertexPutsAsync(head) - val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) - Seq(edgeFuture, vertexFuture) - case Nil => Nil - } - - val composed = for { - // deleteRet <- Future.sequence(deleteAllFutures) - mutateRet <- Future.sequence(mutateEdgeFutures) - } yield mutateRet - - composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) } - } - - Future.sequence(mutateEdges).map { squashedRets => - squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) - } - } - - def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = { - val futures = for { - edge <- edges - } yield { - val kvs = for { - relEdge <- edge.relatedEdges - edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) - } yield { - val countWithTs = edge.propertyValueInner(LabelMeta.count) - val countVal = countWithTs.innerVal.toString().toLong - io.buildIncrementsCountAsync(edgeWithIndex, countVal).head - } - writeToStorage(zkQuorum, kvs, withWait = withWait) - } - - Future.sequence(futures) - } - - def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = { - val kvs = io.buildDegreePuts(edge, degreeVal) - - writeToStorage(zkQuorum, kvs, withWait = true) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala new file mode 100644 index 0000000..aa0c6b5 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/ImportStatus.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.utils + +import java.util.concurrent.atomic.AtomicInteger + +trait ImportStatus { + val done: AtomicInteger + + def isCompleted: Boolean + + def percentage: Int + + val total: Int +} + +class ImportRunningStatus(val total: Int) extends ImportStatus { + require(total > 0, s"Total should be positive: $total") + + val done = new AtomicInteger(0) + + def isCompleted: Boolean = total == done.get + + def percentage = 100 * done.get / total +} + +case object ImportDoneStatus extends ImportStatus { + val total = 1 + + val done = new AtomicInteger(1) + + def isCompleted: Boolean = true + + def percentage = 100 +} + +object ImportStatus { + def apply(total: Int): ImportStatus = new ImportRunningStatus(total) + + def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] = + Some((importResult.isCompleted, importResult.total, importResult.done.get)) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala new file mode 100644 index 0000000..300106a --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.utils + +import java.io.File + +import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike} +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +object Importer { + def toHDFSConfiguration(hdfsConfDir: String): Configuration = { + val conf = new Configuration + + val hdfsConfDirectory = new File(hdfsConfDir) + if (hdfsConfDirectory.exists()) { + if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) { + throw new IllegalStateException(s"HDFS configuration directory ($hdfsConfDirectory) cannot be read.") + } + + val path = hdfsConfDirectory.getAbsolutePath + conf.addResource(new Path(s"file:///$path/core-site.xml")) + conf.addResource(new Path(s"file:///$path/hdfs-site.xml")) + } else { + logger.warn("RocksDBImporter doesn't have valid hadoop configuration directory..") + } + conf + } +} + +trait Importer { + @volatile var isFinished: Boolean = false + + def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] + + def status: Boolean = isFinished + + def setStatus(otherStatus: Boolean): Boolean = { + this.isFinished = otherStatus + this.isFinished + } + + def close(): Unit +} + +case class IdentityImporter(graph: S2GraphLike) extends Importer { + override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + Future.successful(this) + } + + override def close(): Unit = {} +} + +object HDFSImporter { + + import scala.collection.JavaConverters._ + + val PathsKey = "paths" + val HDFSConfDirKey = "hdfsConfDir" + + def extractPaths(config: Config): Map[String, String] = { + config.getConfigList(PathsKey).asScala.map { e => + val key = e.getString("src") + val value = e.getString("tgt") + + key -> value + }.toMap + } +} + +case class HDFSImporter(graph: S2GraphLike) extends Importer { + + import HDFSImporter._ + + override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + Future { + val paths = extractPaths(config) + val hdfsConfiDir = config.getString(HDFSConfDirKey) + + val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir) + val fs = FileSystem.get(hadoopConfig) + + def copyToLocal(remoteSrc: String, localSrc: String): Unit = { + val remoteSrcPath = new Path(remoteSrc) + val localSrcPath = new Path(localSrc) + + fs.copyToLocalFile(remoteSrcPath, localSrcPath) + } + + paths.foreach { case (srcPath, tgtPath) => + copyToLocal(srcPath, tgtPath) + } + + this + } + } + + // override def status: ImportStatus = ??? + + override def close(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala new file mode 100644 index 0000000..6d95c93 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.fetcher + +import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.Integrate.IntegrateCommon +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.{Query, QueryParam} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext} + +class EdgeFetcherTest extends IntegrateCommon { + + import scala.collection.JavaConverters._ + + test("MemoryModelFetcher") { + // 1. create label. + // 2. importLabel. + // 3. fetch. + val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = + management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) + val labelName = "fetcher_test" + val options = + s"""{ + | + | "importer": { + | "${FetcherManager.ClassNameKey}": "org.apache.s2graph.core.utils.IdentityImporter" + | }, + | "fetcher": { + | "${FetcherManager.ClassNameKey}": "org.apache.s2graph.core.fetcher.MemoryModelFetcher" + | } + |}""".stripMargin + + Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } + + val label = management.createLabel( + labelName, + serviceColumn, + serviceColumn, + true, + service.serviceName, + Seq.empty[Index].asJava, + Seq.empty[Prop].asJava, + "strong", + null, + -1, + "v3", + "gz", + options + ) + val config = ConfigFactory.parseString(options) + val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) + Await.ready(importerFuture, Duration("60 seconds")) + + Thread.sleep(1000) + + val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon") + val queryParam = QueryParam(labelName = labelName) + + val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) + val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) + + stepResult.edgeWithScores.foreach { es => + println(es.edge) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala deleted file mode 100644 index 6c76cdf..0000000 --- a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.model - -import com.typesafe.config.ConfigFactory -import org.apache.s2graph.core.Integrate.IntegrateCommon -import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core.schema.Label -import org.apache.s2graph.core.{Query, QueryParam} - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext} - -class FetcherTest extends IntegrateCommon { - - import scala.collection.JavaConverters._ - - test("MemoryModelFetcher") { - // 1. create label. - // 2. importLabel. - // 3. fetch. - val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get - val serviceColumn = - management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) - val labelName = "fetcher_test" - val options = - s"""{ - | - | "importer": { - | "${ModelManager.ClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" - | }, - | "fetcher": { - | "${ModelManager.ClassNameKey}": "org.apache.s2graph.core.model.MemoryModelFetcher" - | } - |}""".stripMargin - - Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } - - val label = management.createLabel( - labelName, - serviceColumn, - serviceColumn, - true, - service.serviceName, - Seq.empty[Index].asJava, - Seq.empty[Prop].asJava, - "strong", - null, - -1, - "v3", - "gz", - options - ) - val config = ConfigFactory.parseString(options) - val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) - Await.ready(importerFuture, Duration("60 seconds")) - - Thread.sleep(1000) - - val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon") - val queryParam = QueryParam(labelName = labelName) - - val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) - val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) - - stepResult.edgeWithScores.foreach { es => - println(es.edge) - } - } -}
