add test case for TransferHFile.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/cd41b8f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/cd41b8f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/cd41b8f1 Branch: refs/heads/master Commit: cd41b8f147394e226298f4e8efa696a3eed0976d Parents: 4e3fd9c Author: DO YUNG YOON <[email protected]> Authored: Tue Feb 27 16:35:37 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Feb 27 16:56:18 2018 +0900 ---------------------------------------------------------------------- loader/build.sbt | 5 +- loader/loader.py | 30 ++- .../loader/subscriber/GraphSubscriber.scala | 10 + .../loader/subscriber/TransferToHFile.scala | 204 +++++++++++++------ .../loader/subscriber/TransferToHFileTest.scala | 123 +++++++++++ .../apache/s2graph/core/storage/SKeyValue.scala | 4 + .../s2graph/core/storage/StorageSerDe.scala | 2 +- 7 files changed, 306 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/build.sbt ---------------------------------------------------------------------- diff --git a/loader/build.sbt b/loader/build.sbt index ac7d948..a93713a 100644 --- a/loader/build.sbt +++ b/loader/build.sbt @@ -29,14 +29,15 @@ projectDependencies := Seq( libraryDependencies ++= Seq( "com.google.guava" % "guava" % "12.0.1" force(), // use this old version of guava to avoid incompatibility - "org.apache.spark" %% "spark-core" % sparkVersion % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion % "provided" exclude("javax.servlet", "*"), "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", "org.apache.spark" %% "spark-hive" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion, "org.apache.httpcomponents" % "fluent-hc" % "4.2.5", "org.specs2" %% "specs2-core" % specs2Version % "test", "org.scalatest" %% "scalatest" % "2.2.1" % "test", - "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion + "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion, + "com.github.scopt" %% "scopt" % "3.7.0" ) crossScalaVersions := Seq("2.10.6") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/loader.py ---------------------------------------------------------------------- diff --git a/loader/loader.py b/loader/loader.py index 1d4dc32..58447ff 100644 --- a/loader/loader.py +++ b/loader/loader.py @@ -30,8 +30,32 @@ def hfile(args): --name "TransferToHFile@shon" \ --conf "spark.task.maxFailures=20" \ --master yarn-cluster \ ---num-executors %s --driver-memory 1g --executor-memory 2g --executor-cores 1 %s \ -%s /tmp/%s %s %s %s %s %s %s""" % (args["num_executors"], JAR, args["input"], args["htable_name"], args["hbase_zk"], args["htable_name"], args["db_url"], args["max_file_per_region"], args["label_mapping"], args["auto_create_edge"]) +--num-executors %s \ +--driver-memory 1g \ +--executor-memory 2g \ +--executor-cores 1 \ +%s \ +--input %s \ +--tmpPath /tmp/%s \ +--zkQuorum %s \ +--table %s \ +--dbUrl %s \ +--dbUser %s \ +--dbPassword %s \ +--maxHFilePerRegionServer %s \ +--labelMapping %s \ +--autoEdgeCreate %s""" % (args["num_executors"], + JAR, + args["input"], + args["htable_name"], + args["hbase_zk"], + args["htable_name"], + args["db_url"], + args["db_user"], + args["db_password"], + args["max_file_per_region"], + args["label_mapping"], + args["auto_create_edge"]) print cmd ret = os.system(cmd) print cmd, "return", ret @@ -96,6 +120,8 @@ args = { "hbase_namenode": "hdfs://nameservice:8020", "hbase_zk": "localhost", "db_url": "jdbc:mysql://localhost:3306/graph_dev", +"db_user": "graph", +"db_password": "graph", "max_file_per_region": 1, "label_mapping": "none", "auto_create_edge": "false", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala index 6ecb070..90e8bbb 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala @@ -76,6 +76,16 @@ object GraphSubscriberHelper extends WithKafka { } } + def apply(_config: Config): Unit = { + config = _config + if (g == null) { + val ec = ExecutionContext.Implicits.global + g = new S2Graph(config)(ec) + management = new Management(g) + builder = g.elementBuilder + } + } + def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: String): Unit = { config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index 4eb9898..0d72b9c 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -19,44 +19,103 @@ package org.apache.s2graph.loader.subscriber -import org.apache.hadoop.hbase.client.Put +import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat} -import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} -import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId, LabelWithDirection} -import org.apache.s2graph.loader.spark.{KeyFamilyQualifier, HBaseContext, FamilyHFileWriteOptions} +import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} +import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.s2graph.spark.spark.SparkApp -import org.apache.spark.{SparkContext} +import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.hbase.async.{PutRequest} +import org.hbase.async.PutRequest import play.api.libs.json.Json + import scala.collection.JavaConversions._ object TransferToHFile extends SparkApp { - val usages = - s""" - |create HFiles for hbase table on zkQuorum specified. - |note that hbase table is created already and pre-splitted properly. - | - |params: - |0. input: hdfs path for tsv file(bulk format). - |1. output: hdfs path for storing HFiles. - |2. zkQuorum: running hbase cluster zkQuorum. - |3. tableName: table name for this bulk upload. - |4. dbUrl: db url for parsing to graph element. - """.stripMargin + var options:GraphFileOptions = _ + + case class GraphFileOptions(input: String = "", + tmpPath: String = "", + zkQuorum: String = "", + tableName: String = "", + dbUrl: String = "", + dbUser: String = "", + dbPassword: String = "", + maxHFilePerRegionServer: Int = 1, + labelMapping: Map[String, String] = Map.empty[String, String], + autoEdgeCreate: Boolean = false, + buildDegree: Boolean = false, + compressionAlgorithm: String = "") { + def toConfigParams = { + Map( + "hbase.zookeeper.quorum" -> zkQuorum, + "db.default.url" -> dbUrl, + "db.default.user" -> dbUser, + "db.default.password" -> dbPassword + ) + } + } + + val parser = new scopt.OptionParser[GraphFileOptions]("run") { + + opt[String]('i', "input").required().action( (x, c) => + c.copy(input = x) ).text("hdfs path for tsv file(bulk format)") + + opt[String]('m', "tmpPath").required().action( (x, c) => + c.copy(tmpPath = x) ).text("temp hdfs path for storing HFiles") + + opt[String]('z', "zkQuorum").required().action( (x, c) => + c.copy(zkQuorum = x) ).text("zookeeper config for hbase") + + opt[String]('t', "table").required().action( (x, c) => + c.copy(tableName = x) ).text("table name for this bulk upload.") + + opt[String]('c', "dbUrl").required().action( (x, c) => + c.copy(dbUrl = x)).text("jdbc connection url.") + + opt[String]('u', "dbUser").required().action( (x, c) => + c.copy(dbUser = x)).text("database user name.") + + opt[String]('p', "dbPassword").required().action( (x, c) => + c.copy(dbPassword = x)).text("database password.") + + opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) => + c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer." + ) + + opt[String]('l', "labelMapping").action( (x, c) => + c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change the label from source (originalLabel:newLabel)") + + opt[Boolean]('d', "buildDegree").action( (x, c) => + c.copy(buildDegree = x)).text("generate degree values") + + opt[Boolean]('a', "autoEdgeCreate").action( (x, c) => + c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically") + } //TODO: Process AtomicIncrementRequest too. /** build key values */ case class DegreeKey(vertexIdStr: String, labelName: String, direction: String) + private def toLabelMapping(lableMapping: String): Map[String, String] = { + (for { + token <- lableMapping.split(",") + inner = token.split(":") if inner.length == 2 + } yield { + (inner.head, inner.last) + }).toMap + } + private def insertBulkForLoaderAsync(edge: S2Edge, createRelEdges: Boolean = true): List[PutRequest] = { val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e => @@ -83,14 +142,17 @@ object TransferToHFile extends SparkApp { output <- List(degreeKey -> 1L) ++ extra } yield output } + def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = { val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } } + def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = { val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } } + def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = { val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) val dir = GraphUtil.directions(direction) @@ -101,7 +163,7 @@ object TransferToHFile extends SparkApp { val ts = System.currentTimeMillis() val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs) + val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs) edge.edgesWithIndex.flatMap { indexEdge => GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => @@ -115,13 +177,13 @@ object TransferToHFile extends SparkApp { (key, value) <- degreeKeyVals putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, key.direction, value) } yield { - val p = putRequest - val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) - kv - } + val p = putRequest + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + kv + } kvs.toIterator } - + def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { val kvList = new java.util.ArrayList[KeyValue] for (s <- strs) { @@ -143,73 +205,81 @@ object TransferToHFile extends SparkApp { val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) kvList.add(kv) } - } + } } } kvList.iterator() } - + def generateKeyValues(sc: SparkContext, + s2Config: Config, + input: RDD[String], + graphFileOptions: GraphFileOptions): RDD[KeyValue] = { + val kvs = input.mapPartitions { iter => + GraphSubscriberHelper.apply(s2Config) - override def run() = { - val input = args(0) - val tmpPath = args(1) - val zkQuorum = args(2) - val tableName = args(3) - val dbUrl = args(4) - val maxHFilePerResionServer = args(5).toInt - val labelMapping = if (args.length >= 7) GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String] - val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false - val buildDegree = if (args.length >= 9) args(8).toBoolean else true - val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4" - val conf = sparkConf(s"$input: TransferToHFile") - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryoserializer.buffer.mb", "24") + toKeyValues(iter.toSeq, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate) + } - val sc = new SparkContext(conf) + if (!graphFileOptions.buildDegree) kvs + else { + kvs ++ buildDegrees(input, graphFileOptions.labelMapping, graphFileOptions.autoEdgeCreate).reduceByKey { (agg, current) => + agg + current + }.mapPartitions { iter => + GraphSubscriberHelper.apply(s2Config) - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - GraphSubscriberHelper.management.createStorageTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) + toKeyValues(iter.toSeq) + } + } + } - /* set up hbase init */ + def toHBaseConfig(graphFileOptions: GraphFileOptions): Configuration = { val hbaseConf = HBaseConfiguration.create() - hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) - hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) - hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName") + hbaseConf.set("hbase.zookeeper.quorum", graphFileOptions.zkQuorum) + hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, graphFileOptions.tableName) + hbaseConf.set("hadoop.tmp.dir", s"/tmp/${graphFileOptions.tableName}") - val rdd = sc.textFile(input) - + hbaseConf + } - val kvs = rdd.mapPartitions { iter => - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate) + override def run() = { + parser.parse(args, GraphFileOptions()) match { + case Some(o) => options = o + case None => + parser.showUsage() + throw new IllegalArgumentException("failed to parse options...") } - val merged = if (!buildDegree) kvs - else { - kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => - agg + current - }.mapPartitions { iter => - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - toKeyValues(iter.toSeq) - } - } + println(s">>> Options: ${options}") + val s2Config = Management.toConfig(options.toConfigParams) + + val conf = sparkConf(s"TransferToHFile") + + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "24") + + val sc = new SparkContext(conf) + val rdd = sc.textFile(options.input) + + GraphSubscriberHelper.apply(s2Config) + + val merged = TransferToHFile.generateKeyValues(sc, s2Config, rdd, options) + + /* set up hbase init */ + val hbaseSc = new HBaseContext(sc, toHBaseConfig(options)) - val hbaseSc = new HBaseContext(sc, hbaseConf) def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { val k = new KeyFamilyQualifier(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)) val v = CellUtil.cloneValue(kv) Seq((k -> v)).toIterator } + val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase, BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) val familyOptionsMap = Map("e".getBytes("UTF-8") -> familyOptions, "v".getBytes("UTF-8") -> familyOptions) - hbaseSc.bulkLoad(merged, TableName.valueOf(tableName), flatMap, tmpPath, familyOptionsMap) + hbaseSc.bulkLoad(merged, TableName.valueOf(options.tableName), flatMap, options.tmpPath, familyOptionsMap) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala new file mode 100644 index 0000000..36ee530 --- /dev/null +++ b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.s2graph.loader.subscriber + +import java.util + +import org.apache.s2graph.core.Management +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.storage.CanSKeyValue +import org.apache.s2graph.core.types.HBaseType +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +import scala.util.Try + +class TransferToHFileTest extends FunSuite with Matchers with BeforeAndAfterAll { + + import TransferToHFile._ + + private val master = "local[2]" + private val appName = "example-spark" + + private var sc: SparkContext = _ + + /* TransferHFile parameters */ + val options = GraphFileOptions( + zkQuorum = "localhost", + dbUrl = "jdbc:h2:file:./var/metastore;MODE=MYSQL", + dbUser = "sa", + dbPassword = "sa", + tableName = "s2graph", + maxHFilePerRegionServer = 1, + compressionAlgorithm = "gz", + buildDegree = false, + autoEdgeCreate = false) + + val s2Config = Management.toConfig(options.toConfigParams) + + val tableName = options.tableName + val schemaVersion = HBaseType.DEFAULT_VERSION + val compressionAlgorithm: String = options.compressionAlgorithm + + override def beforeAll(): Unit = { + // initialize spark context. + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + sc = new SparkContext(conf) + + GraphSubscriberHelper.apply(s2Config) + } + + override def afterAll(): Unit = { + GraphSubscriberHelper.g.shutdown() + if (sc != null) { + sc.stop() + } + } + + + test("test TransferToHFile Local.") { + import scala.collection.JavaConverters._ + import org.apache.s2graph.core.storage.CanSKeyValue._ + + /* initialize model for test */ + val management = GraphSubscriberHelper.management + + val service = management.createService(serviceName = "s2graph", cluster = "localhost", + hTableName = "s2graph", preSplitSize = -1, hTableTTL = -1, compressionAlgorithm = "gz") + + val serviceColumn = management.createServiceColumn(service.serviceName, "user", "string", new util.ArrayList[Prop]()) + + Try { + management.createLabel("friends", serviceColumn, serviceColumn, isDirected = true, + serviceName = service.serviceName, indices = new java.util.ArrayList[Index], + props = Seq(Prop("since", "0", "long"), Prop("score", "0", "integer")).asJava, consistencyLevel = "strong", hTableName = tableName, + hTableTTL = -1, schemaVersion = schemaVersion, compressionAlgorithm = compressionAlgorithm, options = "") + } + + val label = Label.findByName("friends").getOrElse(throw new IllegalArgumentException("friends label is not initialized.")) + /* end of initialize model */ + + val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" + val input = sc.parallelize(Seq(bulkEdgeString)) + + val kvs = TransferToHFile.generateKeyValues(sc, s2Config, input, options) + + val ls = kvs.map(kv => CanSKeyValue.hbaseKeyValue.toSKeyValue(kv)).collect().toList + + val serDe = GraphSubscriberHelper.g.defaultStorage.serDe + + // val snapshotEdgeOpt = serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.head), None) + // val indexEdgeOpt = serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(ls.last), None) + + val bulkEdge = GraphSubscriberHelper.g.elementBuilder.toGraphElement(bulkEdgeString, options.labelMapping).get + + val indexEdges = ls.flatMap { kv => + serDe.indexEdgeDeserializer(label.schemaVersion).fromKeyValues(Seq(kv), None) + } + + val indexEdge = indexEdges.head + + bulkEdge shouldBe(indexEdge) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala index 775afda..57adc8a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala @@ -65,6 +65,10 @@ object CanSKeyValue { SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp()) } + implicit val hbaseKeyValue = instance[org.apache.hadoop.hbase.KeyValue] { kv => + SKeyValue(Array.empty[Byte], kv.getRow, kv.getFamily, kv.getQualifier, kv.getValue, kv.getTimestamp) + } + // For asyncbase KeyValues implicit val sKeyValue = instance[SKeyValue](identity) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/cd41b8f1/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala index 32d640c..78da629 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerDe.scala @@ -70,7 +70,7 @@ trait StorageSerDe { **/ def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] - def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable + def indexEdgeDeserializer(schemaVer: String): Deserializable[S2EdgeLike] def vertexDeserializer(schemaVer: String): Deserializable[S2VertexLike]
