Update TransferToHFile.scala add vertex file bulkload
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2c3bb984 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2c3bb984 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2c3bb984 Branch: refs/heads/master Commit: 2c3bb98424830d8b90d8064632ec53071c7e9dfd Parents: 1e11e0e Author: zhangmengzju <[email protected]> Authored: Tue Oct 10 15:49:39 2017 +0800 Committer: GitHub <[email protected]> Committed: Tue Oct 10 15:49:39 2017 +0800 ---------------------------------------------------------------------- .../loader/subscriber/TransferToHFile.scala | 38 +++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2c3bb984/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index 8b62d36..a5d73f4 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -121,22 +121,34 @@ object TransferToHFile extends SparkApp { } kvs.toIterator } - + def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { - val kvs = for { - s <- strs - element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[S2Edge] - edge = element.asInstanceOf[S2Edge] - putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate) - } yield { - val p = putRequest - val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) - - - kv + val kvList = new util.ArrayList[KeyValue] + for (s <- strs) { + val elementList = Graph.toGraphElement(s, labelMapping).toSeq + for (element <- elementList) { + if (element.isInstanceOf[S2Edge]) { + val edge = element.asInstanceOf[S2Edge] + val putRequestList = insertBulkForLoaderAsync(edge, autoEdgeCreate) + for (p <- putRequestList) { + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + kvList.add(kv) + } + } else if (element.isInstanceOf[S2Vertex]) { + val vertex = element.asInstanceOf[S2Vertex] + val putRequestList = GraphSubscriberHelper.g.storage.vertexSerializer(vertex).toKeyValues.map { kv => + new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) + } + for (p <- putRequestList) { + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + kvList.add(kv) + } + } } - kvs.toIterator + } + kvList.iterator() } + override def run() = {
