Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 1799ae456 -> 5a0e4d835


add skipError option to skip over not-serializable data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/63dd6fa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/63dd6fa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/63dd6fa2

Branch: refs/heads/master
Commit: 63dd6fa23803f1a76a86f8e53c6115c4dd15cbf9
Parents: 3332f6b
Author: DO YUNG YOON <steams...@apache.org>
Authored: Mon Apr 2 13:57:15 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Mon Apr 2 13:57:15 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   | 28 +++---
 .../s2jobs/loader/GraphFileOptions.scala        | 14 +--
 .../s2graph/s2jobs/loader/HFileGenerator.scala  |  1 +
 .../s2jobs/loader/HFileMRGenerator.scala        |  1 +
 .../loader/LocalBulkLoaderTransformer.scala     | 61 -------------
 .../loader/SparkBulkLoaderTransformer.scala     | 76 ----------------
 .../serde/LocalBulkLoaderTransformer.scala      | 61 +++++++++++++
 .../serde/SparkBulkLoaderTransformer.scala      | 76 ++++++++++++++++
 .../s2jobs/serde/writer/KeyValueWriter.scala    |  5 +-
 .../s2jobs/loader/GraphFileGeneratorTest.scala  | 95 ++++++++++----------
 10 files changed, 214 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 3f80e8f..383f39f 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -24,6 +24,7 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 import play.api.libs.json.Json
 
 import scala.concurrent.ExecutionContext
@@ -54,8 +55,8 @@ object S2GraphHelper {
     }
   }
 
-  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, 
createRelEdges: Boolean = true): Seq[SKeyValue] = {
-    val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
+  private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, option: 
GraphFileOptions): Seq[SKeyValue] = {
+    val relEdges = if (option.autoEdgeCreate) edge.relatedEdges else List(edge)
 
     val snapshotEdgeKeyValues = 
s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues
     val indexEdgeKeyValues = relEdges.flatMap { edge =>
@@ -67,15 +68,20 @@ object S2GraphHelper {
     snapshotEdgeKeyValues ++ indexEdgeKeyValues
   }
 
-  def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean 
= false): Seq[SKeyValue] = {
-    if (element.isInstanceOf[S2Edge]) {
-      val edge = element.asInstanceOf[S2Edge]
-      insertBulkForLoaderAsync(s2, edge, autoEdgeCreate)
-    } else if (element.isInstanceOf[S2Vertex]) {
-      val vertex = element.asInstanceOf[S2Vertex]
-      s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
-    } else {
-      Nil
+  def toSKeyValues(s2: S2Graph, element: GraphElement, option: 
GraphFileOptions): Seq[SKeyValue] = {
+    try {
+      if (element.isInstanceOf[S2Edge]) {
+        val edge = element.asInstanceOf[S2Edge]
+        insertBulkForLoaderAsync(s2, edge, option)
+      } else if (element.isInstanceOf[S2Vertex]) {
+        val vertex = element.asInstanceOf[S2Vertex]
+        
s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues
+      } else {
+        Nil
+      }
+    } catch {
+      case e: Exception =>
+        if (option.skipError) Nil else throw e
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
index 3e3ffb9..e855a32 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala
@@ -50,12 +50,10 @@ object GraphFileOptions {
       c.copy(dbDriver = x)).text("jdbc driver class.")
 
     opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) =>
-      c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per 
RegionServer."
-    )
+      c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per 
RegionServer.")
 
     opt[Int]('n', "numRegions").action ( (x, c) =>
-      c.copy(numRegions = x)).text("total numRegions(pre-split size) on table."
-    )
+      c.copy(numRegions = x)).text("total numRegions(pre-split size) on 
table.")
 
     opt[String]('l', "labelMapping").action( (x, c) =>
       c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change 
the label from source (originalLabel:newLabel)")
@@ -67,8 +65,11 @@ object GraphFileOptions {
       c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically")
 
     opt[Boolean]('c', "incrementalLoad").action( (x, c) =>
-      c.copy(incrementalLoad = x)).text("whether incremental bulkload which 
append data on existing table or not."
-    )
+      c.copy(incrementalLoad = x)).text("whether incremental bulkload which 
append data on existing table or not.")
+
+    opt[Boolean]('s', "skipError").action ((x, c) =>
+      c.copy(skipError = x)).text("whether skip error row.")
+
     opt[String]('m', "method").action( (x, c) =>
       c.copy(method = x)).text("run method. currently MR(default)/SPARK 
supported."
     )
@@ -124,6 +125,7 @@ case class GraphFileOptions(input: String = "",
                             autoEdgeCreate: Boolean = false,
                             buildDegree: Boolean = false,
                             incrementalLoad: Boolean = false,
+                            skipError: Boolean = false,
                             compressionAlgorithm: String = "NONE",
                             method: String = "SPARK") {
   def toConfigParams = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index b4ac51f..8ace94a 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, 
TableName}
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
index 3502bee..fd78718 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
SequenceFileInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
SequenceFileOutputFormat}
 import org.apache.hadoop.mapreduce.{Job, Mapper}
+import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
deleted file mode 100644
index 7d405a6..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.loader
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.KeyValue
-import org.apache.s2graph.core.{GraphElement, S2Graph}
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-
-import scala.concurrent.ExecutionContext
-
-class LocalBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions)(implicit ec: 
ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
-  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
-
-  override val reader = new TsvBulkFormatReader
-  override val writer = new KeyValueWriter
-
-  override def read(input: Seq[String]): Seq[GraphElement] = 
input.flatMap(reader.read(s2)(_))
-
-  override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = 
elements.map(writer.write(s2)(_))
-
-  override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = 
{
-    val degrees = elements.flatMap { element =>
-      DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
-    }.groupBy(_._1).mapValues(_.map(_._2).sum)
-
-    degrees.toSeq.map { case (degreeKey, count) =>
-      DegreeKey.toKeyValue(s2, degreeKey, count)
-    }
-  }
-
-  override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
-
-    kvs ++ degrees
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
deleted file mode 100644
index cd991e1..0000000
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.s2jobs.loader
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.GraphElement
-import org.apache.s2graph.s2jobs.serde.Transformer
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
-import org.apache.spark.rdd.RDD
-
-class SparkBulkLoaderTransformer(val config: Config,
-                                 val options: GraphFileOptions) extends 
Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
-  val reader = new TsvBulkFormatReader
-
-  val writer = new KeyValueWriter
-
-  override def read(input: RDD[String]): RDD[GraphElement] = 
input.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.flatMap { line =>
-      reader.read(s2)(line)
-    }
-  }
-
-  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = 
elements.mapPartitions { iter =>
-    val s2 = S2GraphHelper.initS2Graph(config)
-
-    iter.map(writer.write(s2)(_))
-  }
-
-  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] 
= {
-    val degrees = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.flatMap { element =>
-        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
-      }
-    }.reduceByKey(_ + _)
-
-    degrees.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
-
-      iter.map { case (degreeKey, count) =>
-        DegreeKey.toKeyValue(s2, degreeKey, count)
-      }
-    }
-  }
-
-  override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
-    val elements = read(input)
-    val kvs = write(elements)
-
-    if (options.buildDegree) kvs ++ buildDegrees(elements)
-    kvs
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
new file mode 100644
index 0000000..a185754
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.KeyValue
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+
+import scala.concurrent.ExecutionContext
+
+class LocalBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions)(implicit ec: 
ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] {
+  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+
+  override val reader = new TsvBulkFormatReader
+  override val writer = new KeyValueWriter(options)
+
+  override def read(input: Seq[String]): Seq[GraphElement] = 
input.flatMap(reader.read(s2)(_))
+
+  override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = 
elements.map(writer.write(s2)(_))
+
+  override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = 
{
+    val degrees = elements.flatMap { element =>
+      DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
+    }.groupBy(_._1).mapValues(_.map(_._2).sum)
+
+    degrees.toSeq.map { case (degreeKey, count) =>
+      DegreeKey.toKeyValue(s2, degreeKey, count)
+    }
+  }
+
+  override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = {
+    val elements = read(input)
+    val kvs = write(elements)
+
+    val degrees = if (options.buildDegree) buildDegrees(elements) else Nil
+
+    kvs ++ degrees
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
new file mode 100644
index 0000000..63f4e2c
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.serde
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+import org.apache.s2graph.core.GraphElement
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
+import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
+import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper}
+import org.apache.spark.rdd.RDD
+
+class SparkBulkLoaderTransformer(val config: Config,
+                                 val options: GraphFileOptions) extends 
Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] {
+  val reader = new TsvBulkFormatReader
+
+  val writer = new KeyValueWriter(options)
+
+  override def read(input: RDD[String]): RDD[GraphElement] = 
input.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.flatMap { line =>
+      reader.read(s2)(line)
+    }
+  }
+
+  override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = 
elements.mapPartitions { iter =>
+    val s2 = S2GraphHelper.initS2Graph(config)
+
+    iter.map(writer.write(s2)(_))
+  }
+
+  override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] 
= {
+    val degrees = elements.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.flatMap { element =>
+        DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
+      }
+    }.reduceByKey(_ + _)
+
+    degrees.mapPartitions { iter =>
+      val s2 = S2GraphHelper.initS2Graph(config)
+
+      iter.map { case (degreeKey, count) =>
+        DegreeKey.toKeyValue(s2, degreeKey, count)
+      }
+    }
+  }
+
+  override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = {
+    val elements = read(input)
+    val kvs = write(elements)
+
+    if (options.buildDegree) kvs ++ buildDegrees(elements)
+    kvs
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
index 02034af..22eee34 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala
@@ -22,11 +22,12 @@ package org.apache.s2graph.s2jobs.serde.writer
 import org.apache.hadoop.hbase.KeyValue
 import org.apache.s2graph.core.{GraphElement, S2Graph}
 import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 import org.apache.s2graph.s2jobs.serde.GraphElementWritable
 
-class KeyValueWriter(autoEdgeCreate: Boolean = false) extends 
GraphElementWritable[Seq[KeyValue]] {
+class KeyValueWriter(option: GraphFileOptions) extends 
GraphElementWritable[Seq[KeyValue]] {
   override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = {
-    S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv =>
+    S2GraphHelper.toSKeyValues(s2, element, option).map { skv =>
       new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
index 3fbbd88..3bd1a23 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala
@@ -19,20 +19,18 @@
 
 package org.apache.s2graph.s2jobs.loader
 
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{PostProcess, S2VertexLike}
+import org.apache.s2graph.core.PostProcess
 import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue}
 import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.s2graph.s2jobs.serde.{LocalBulkLoaderTransformer, 
SparkBulkLoaderTransformer}
 import play.api.libs.json.Json
 
-import scala.io.Source
-
 class GraphFileGeneratorTest extends BaseSparkTest {
-  import scala.concurrent.ExecutionContext.Implicits.global
+
   import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
 
+  import scala.concurrent.ExecutionContext.Implicits.global
+
   def transformToSKeyValues(transformerMode: String, edges: Seq[String]): 
List[SKeyValue] = {
     transformerMode match {
       case "spark" =>
@@ -56,6 +54,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
         }.toList
     }
   }
+
   test("test generateKeyValues edge only. SparkBulkLoaderTransformer") {
     val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
     /* end of initialize model */
@@ -121,7 +120,7 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       println(Json.prettyPrint(jsValue))
     }
 
-    bulkVertex shouldBe(vertex)
+    bulkVertex shouldBe (vertex)
   }
 
   test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") {
@@ -140,46 +139,46 @@ class GraphFileGeneratorTest extends BaseSparkTest {
       println(Json.prettyPrint(jsValue))
     }
 
-    bulkVertex shouldBe(vertex)
+    bulkVertex shouldBe (vertex)
   }
 
-//   this test case expect options.input already exist with valid bulk load 
format.
-//  test("bulk load and fetch vertex: spark mode") {
-//    import scala.collection.JavaConverters._
-//    val serviceColumn = initTestVertexSchema(s2)
-//
-//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-//    val input = sc.parallelize(bulkVertexLs)
-//
-//    HFileGenerator.generate(sc, s2Config, input, options)
-//
-//    val hfileArgs = Array(options.output, options.tableName)
-//    val hbaseConfig = HBaseConfiguration.create()
-//
-//    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-//
-//    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-//    val json = PostProcess.verticesToJson(s2Vertices)
-//
-//    println(Json.prettyPrint(json))
-//  }
-
-//   this test case expect options.input already exist with valid bulk load 
format.
-//  test("bulk load and fetch vertex: mr mode") {
-//    val serviceColumn = initTestVertexSchema(s2)
-//
-//    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
-//    val input = sc.parallelize(bulkVertexLs)
-//
-//    HFileMRGenerator.generate(sc, s2Config, input, options)
-//
-//    val hfileArgs = Array(options.output, options.tableName)
-//    val hbaseConfig = HBaseConfiguration.create()
-//
-//    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
-//    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
-//    val json = PostProcess.verticesToJson(s2Vertices)
-//
-//    println(Json.prettyPrint(json))
-//  }
+  //   this test case expect options.input already exist with valid bulk load 
format.
+  //  test("bulk load and fetch vertex: spark mode") {
+  //    import scala.collection.JavaConverters._
+  //    val serviceColumn = initTestVertexSchema(s2)
+  //
+  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+  //    val input = sc.parallelize(bulkVertexLs)
+  //
+  //    HFileGenerator.generate(sc, s2Config, input, options)
+  //
+  //    val hfileArgs = Array(options.output, options.tableName)
+  //    val hbaseConfig = HBaseConfiguration.create()
+  //
+  //    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  //
+  //    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+  //    val json = PostProcess.verticesToJson(s2Vertices)
+  //
+  //    println(Json.prettyPrint(json))
+  //  }
+
+  //   this test case expect options.input already exist with valid bulk load 
format.
+  //  test("bulk load and fetch vertex: mr mode") {
+  //    val serviceColumn = initTestVertexSchema(s2)
+  //
+  //    val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq
+  //    val input = sc.parallelize(bulkVertexLs)
+  //
+  //    HFileMRGenerator.generate(sc, s2Config, input, options)
+  //
+  //    val hfileArgs = Array(options.output, options.tableName)
+  //    val hbaseConfig = HBaseConfiguration.create()
+  //
+  //    val ret = ToolRunner.run(hbaseConfig, new 
LoadIncrementalHFiles(hbaseConfig), hfileArgs)
+  //    val s2Vertices = 
s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike])
+  //    val json = PostProcess.verticesToJson(s2Vertices)
+  //
+  //    println(Json.prettyPrint(json))
+  //  }
 }

Reply via email to