Update TransferToHFile.scala

add vertex file bulkload

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2c3bb984
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2c3bb984
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2c3bb984

Branch: refs/heads/master
Commit: 2c3bb98424830d8b90d8064632ec53071c7e9dfd
Parents: 1e11e0e
Author: zhangmengzju <[email protected]>
Authored: Tue Oct 10 15:49:39 2017 +0800
Committer: GitHub <[email protected]>
Committed: Tue Oct 10 15:49:39 2017 +0800

----------------------------------------------------------------------
 .../loader/subscriber/TransferToHFile.scala     | 38 +++++++++++++-------
 1 file changed, 25 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2c3bb984/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 8b62d36..a5d73f4 100644
--- 
a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -121,22 +121,34 @@ object TransferToHFile extends SparkApp {
       }
     kvs.toIterator
   }
-
+  
   def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], 
autoEdgeCreate: Boolean): Iterator[KeyValue] = {
-    val kvs = for {
-      s <- strs
-      element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq 
if element.isInstanceOf[S2Edge]
-      edge = element.asInstanceOf[S2Edge]
-      putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate)
-    } yield {
-        val p = putRequest
-        val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, 
p.value)
-
-   
-        kv
+    val kvList = new util.ArrayList[KeyValue]
+    for (s <- strs) {
+      val elementList = Graph.toGraphElement(s, labelMapping).toSeq
+      for (element <- elementList) {
+        if (element.isInstanceOf[S2Edge]) {
+          val edge = element.asInstanceOf[S2Edge]
+          val putRequestList = insertBulkForLoaderAsync(edge, autoEdgeCreate)
+          for (p <- putRequestList) {
+            val kv = new KeyValue(p.key(), p.family(), p.qualifier, 
p.timestamp, p.value)
+            kvList.add(kv)
+          }
+        } else if (element.isInstanceOf[S2Vertex]) {
+          val vertex = element.asInstanceOf[S2Vertex]
+          val putRequestList = 
GraphSubscriberHelper.g.storage.vertexSerializer(vertex).toKeyValues.map { kv =>
+            new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, 
kv.timestamp)
+          }
+          for (p <- putRequestList) {
+            val kv = new KeyValue(p.key(), p.family(), p.qualifier, 
p.timestamp, p.value)
+            kvList.add(kv)
+          }
+        } 
       }
-    kvs.toIterator
+    }
+    kvList.iterator()
   }
+  
 
 
   override def run() = {

Reply via email to