Repository: incubator-s2graph
Updated Branches:
  refs/heads/master a07c4d2d3 -> b21db657e


add S2GraphSource.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ef72257e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ef72257e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ef72257e

Branch: refs/heads/master
Commit: ef72257ee875cb9ba1774d568ae3bbf6c6e2e262
Parents: 1799ae4
Author: DO YUNG YOON <[email protected]>
Authored: Wed Apr 4 13:49:03 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Wed Apr 4 13:49:03 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/S2VertexLike.scala  |  6 +-
 .../org/apache/s2graph/s2jobs/DegreeKey.scala   | 32 ++++++++-
 .../apache/s2graph/s2jobs/S2GraphHelper.scala   | 23 -------
 .../s2graph/s2jobs/loader/HFileGenerator.scala  | 60 +++++++++++++++--
 .../s2jobs/serde/reader/IdentityReader.scala    |  9 +++
 .../s2jobs/serde/reader/S2GraphCellReader.scala | 37 +++++++++++
 .../s2jobs/serde/writer/DataFrameWriter.scala   | 25 +++++++
 .../s2jobs/serde/writer/IdentityWriter.scala    | 13 ++++
 .../org/apache/s2graph/s2jobs/task/Source.scala | 21 ++++++
 .../apache/s2graph/s2jobs/task/SourceTest.scala | 69 ++++++++++++++++++++
 10 files changed, 260 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index 5612525..2fbc4f1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -68,10 +68,8 @@ trait S2VertexLike extends Vertex with GraphElement {
       if (!id.storeColId) ("", "")
       else (serviceColumn.service.serviceName, serviceColumn.columnName)
 
-    if (propsWithName.nonEmpty)
-      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, 
Json.toJson(propsWithName)).mkString("\t")
-    else
-      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, 
columnName).mkString("\t")
+
+    Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, 
Json.toJson(propsWithName)).mkString("\t")
   }
 
   def vertices(direction: Direction, edgeLabels: String*): 
util.Iterator[Vertex] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
index f1efac3..561c676 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
@@ -19,9 +19,12 @@
 
 package org.apache.s2graph.s2jobs
 
-import org.apache.s2graph.core.{GraphElement, S2Edge, S2Graph, S2Vertex}
+import org.apache.s2graph.core._
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
+import play.api.libs.json.Json
 
 
 object DegreeKey {
@@ -38,10 +41,35 @@ object DegreeKey {
     }
   }
 
+  def toEdge(s2: S2Graph, degreeKey: DegreeKey, count: Long): S2EdgeLike = {
+    val labelName = degreeKey.labelName
+    val direction = degreeKey.direction
+    val vertexId = degreeKey.vertexIdStr
+    val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
+    val dir = GraphUtil.directions(direction)
+    val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), 
label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
+      throw new RuntimeException(s"$vertexId can not be converted into 
innerval")
+    }
+    val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, 
innerVal))
+
+    val ts = System.currentTimeMillis()
+    val propsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion),
+      LabelMeta.degree -> InnerValLikeWithTs.withLong(count, ts, 
label.schemaVersion))
+
+    s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = 
propsWithTs)
+  }
+
   def toSKeyValue(s2: S2Graph,
                   degreeKey: DegreeKey,
                   count: Long): Seq[SKeyValue] = {
-    S2GraphHelper.buildDegreePutRequests(s2, degreeKey.vertexIdStr, 
degreeKey.labelName, degreeKey.direction, count)
+    try {
+      val edge = toEdge(s2, degreeKey, count)
+      edge.edgesWithIndex.flatMap { indexEdge =>
+        
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues
+      }
+    } catch {
+      case e: Exception => Nil
+    }
   }
 
   def toKeyValue(s2: S2Graph, degreeKey: DegreeKey, count: Long): 
Seq[HKeyValue] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 6e68d28..845b343 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -21,9 +21,7 @@ package org.apache.s2graph.s2jobs
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
-import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.StructType
 import play.api.libs.json.{JsObject, Json}
@@ -36,27 +34,6 @@ object S2GraphHelper {
     new S2Graph(config)
   }
 
-  def buildDegreePutRequests(s2: S2Graph,
-                             vertexId: String,
-                             labelName: String,
-                             direction: String,
-                             degreeVal: Long): Seq[SKeyValue] = {
-    val label = Label.findByName(labelName).getOrElse(throw new 
RuntimeException(s"$labelName is not found in DB."))
-    val dir = GraphUtil.directions(direction)
-    val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), 
label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
-      throw new RuntimeException(s"$vertexId can not be converted into 
innerval")
-    }
-    val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, 
innerVal))
-
-    val ts = System.currentTimeMillis()
-    val propsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
-    val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, 
propsWithTs = propsWithTs)
-
-    edge.edgesWithIndex.flatMap { indexEdge =>
-      
s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues
-    }
-  }
-
   private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, 
createRelEdges: Boolean = true): Seq[SKeyValue] = {
     val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
index e7535d4..ee6b842 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala
@@ -21,20 +21,27 @@ package org.apache.s2graph.s2jobs.loader
 
 import com.typesafe.config.Config
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.ConnectionFactory
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Scan}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, 
TableOutputFormat}
+import org.apache.hadoop.hbase.mapreduce._
 import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, 
TableName}
+import org.apache.hadoop.hbase.util.{Base64, Bytes}
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil
+import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.util.ToolRunner
+import org.apache.s2graph.core.Management
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
-import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader
-import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.serde.reader.{S2GraphCellReader, 
TsvBulkFormatReader}
+import org.apache.s2graph.s2jobs.serde.writer.{DataFrameWriter, KeyValueWriter}
 import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, 
KeyFamilyQualifier}
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
 
 object HFileGenerator extends RawFileGenerator[String, KeyValue] {
 
@@ -131,5 +138,46 @@ object HFileGenerator extends RawFileGenerator[String, 
KeyValue] {
     val hbaseConfig = HBaseConfiguration.create()
     ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), 
hfileArgs)
   }
+
+  def tableSnapshotDump(ss: SparkSession,
+           config: Config,
+           options: GraphFileOptions,
+           snapshotPath: String,
+           restorePath: String,
+           tableNames: Seq[String],
+           columnFamily: String = "e",
+           batchSize: Int = 1000): DataFrame = {
+    import ss.sqlContext.implicits._
+
+    val cf = Bytes.toBytes(columnFamily)
+
+    val hbaseConfig = 
HBaseConfiguration.create(ss.sparkContext.hadoopConfiguration)
+    hbaseConfig.set("hbase.rootdir", snapshotPath)
+
+    val initial = ss.sparkContext.parallelize(Seq.empty[Seq[Cell]])
+    val input = tableNames.foldLeft(initial) { case (prev, tableName) =>
+      val scan = new Scan
+      scan.addFamily(cf)
+      scan.setBatch(batchSize)
+      TableSnapshotInputFormatImpl.setInput(hbaseConfig, tableName, new 
Path(restorePath))
+      hbaseConfig.set(TableInputFormat.SCAN, 
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
+
+      val job = Job.getInstance(hbaseConfig, "Decode index edge from " + 
tableName)
+      val current = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration,
+        classOf[TableSnapshotInputFormat],
+        classOf[ImmutableBytesWritable],
+        classOf[Result]).map(_._2.listCells().asScala.toSeq)
+
+      prev ++ current
+    }
+
+    implicit val reader = new S2GraphCellReader
+    implicit val writer = new DataFrameWriter
+
+    val transformer = new SparkBulkLoaderTransformer(config, options)
+    val kvs = transformer.transform(input)
+
+    kvs.toDF(DataFrameWriter.Fields: _*)
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
new file mode 100644
index 0000000..b4d1eb2
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala
@@ -0,0 +1,9 @@
+package org.apache.s2graph.s2jobs.serde.reader
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+
+class IdentityReader extends GraphElementReadable[GraphElement] {
+  override def read(graph: S2Graph)(data: GraphElement): Option[GraphElement] =
+    Option(data)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
new file mode 100644
index 0000000..868f0f2
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala
@@ -0,0 +1,37 @@
+package org.apache.s2graph.s2jobs.serde.reader
+
+import org.apache.hadoop.hbase.Cell
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.storage.SKeyValue
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.serde.GraphElementReadable
+
+class S2GraphCellReader extends GraphElementReadable[Seq[Cell]]{
+  override def read(s2: S2Graph)(cells: Seq[Cell]): Option[GraphElement] = {
+    if (cells.isEmpty) None
+    else {
+      //TODO:
+      val cell = cells.head
+      val schemaVer = HBaseType.DEFAULT_VERSION
+      val cf = cell.getFamily
+
+      val kvs = cells.map { cell =>
+        new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, 
cell.getQualifier,
+          cell.getValue, cell.getTimestamp, SKeyValue.Default)
+      }
+
+      if (Bytes.equals(cf, SKeyValue.VertexCf)) {
+        
s2.defaultStorage.serDe.vertexDeserializer(schemaVer).fromKeyValues(kvs, 
None).map(_.asInstanceOf[GraphElement])
+      } else if (Bytes.equals(cf, SKeyValue.EdgeCf)) {
+        val indexEdgeOpt = 
s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(kvs, 
None)
+        if (indexEdgeOpt.isDefined) 
indexEdgeOpt.map(_.asInstanceOf[GraphElement])
+        else {
+          val snapshotEdgeOpt = 
s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(kvs, 
None)
+          if (snapshotEdgeOpt.isDefined) 
snapshotEdgeOpt.map(_.toEdge.asInstanceOf[GraphElement])
+          else throw new IllegalStateException(s"column family indicate this 
is edge, but neither snapshot/index edge.")
+        }
+      } else throw new IllegalStateException(s"wrong column family. 
${Bytes.toString(cf)}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala
new file mode 100644
index 0000000..458821e
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala
@@ -0,0 +1,25 @@
+package org.apache.s2graph.s2jobs.serde.writer
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.DegreeKey
+import org.apache.s2graph.s2jobs.serde.GraphElementWritable
+
+object DataFrameWriter {
+  type GraphElementTuple = (String, String, String, String, String, String, 
String, String)
+  val Fields = Seq("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction")
+}
+
+class DataFrameWriter extends 
GraphElementWritable[DataFrameWriter.GraphElementTuple]{
+  override def write(s2: S2Graph)(element: GraphElement): (String, String, 
String, String, String, String, String, String) = {
+    val Array(ts, op, elem, from, to, label, props, dir) = 
element.toLogString().split("\t")
+
+    (ts, op, elem, from, to, label, props, dir)
+  }
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): 
(String, String, String, String, String, String, String, String) = {
+    val element = DegreeKey.toEdge(s2, degreeKey, count)
+    val Array(ts, op, elem, from, to, label, props, dir) = 
element.toLogString().split("\t")
+
+    (ts, op, elem, from, to, label, props, dir)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
new file mode 100644
index 0000000..32bab6a
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala
@@ -0,0 +1,13 @@
+package org.apache.s2graph.s2jobs.serde.writer
+
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.apache.s2graph.s2jobs.DegreeKey
+import org.apache.s2graph.s2jobs.serde.GraphElementWritable
+
+class IdentityWriter extends GraphElementWritable[GraphElement]{
+  override def write(s2: S2Graph)(element: GraphElement): GraphElement = 
element
+
+  override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): 
GraphElement =
+    DegreeKey.toEdge(s2, degreeKey, count)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
index d2ca8ef..9ac9296 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala
@@ -19,6 +19,9 @@
 
 package org.apache.s2graph.s2jobs.task
 
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.s2jobs.S2GraphHelper
+import org.apache.s2graph.s2jobs.loader.HFileGenerator
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 
@@ -102,3 +105,21 @@ class HiveSource(conf:TaskConf) extends Source(conf) {
     ss.sql(sql)
   }
 }
+
+class HBaseTableSnapshotSource(conf: TaskConf) extends Source(conf) {
+
+  override def mandatoryOptions: Set[String] = Set("hbase.rootdir", 
"restore.path", "hbase.table.names")
+
+  override def toDF(ss: SparkSession): DataFrame = {
+    val options = TaskConf.toGraphFileOptions(conf)
+    val config = Management.toConfig(options.toConfigParams)
+
+    val snapshotPath = conf.options("hbase.rootdir")
+    val restorePath = conf.options("restore.path")
+    val tableNames = conf.options("hbase.table.names").split(",")
+    val columnFamily = conf.options.getOrElse("hbase.table.cf", "e")
+    val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt
+
+    HFileGenerator.tableSnapshotDump(ss, config, options, snapshotPath, 
restorePath, tableNames, columnFamily, batchSize)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
new file mode 100644
index 0000000..86cdf2b
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.task
+
+import org.apache.s2graph.core.S2EdgeLike
+import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.spark.sql.DataFrame
+
+import scala.collection.JavaConverters._
+
+class SourceTest extends BaseSparkTest {
+  def toDataFrame(edges: Seq[String]): DataFrame = {
+    import spark.sqlContext.implicits._
+    val elements = edges.flatMap(s2.elementBuilder.toEdge(_))
+
+    elements.map { e =>
+      (e.getTs(),
+        e.getOperation(),
+        "e",
+        e.srcVertex.innerIdVal.toString,
+        e.tgtVertex.innerIdVal.toString,
+        e.label(),
+        "{}",
+        e.getDirection())
+    }.toDF("timestamp", "operation", "element", "from", "to", "label", 
"props", "direction")
+  }
+
+  test("S2graphSink writeBatch.") {
+    val label = initTestEdgeSchema(s2, tableName, schemaVersion, 
compressionAlgorithm)
+
+    val bulkEdgeString = 
"1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}"
+    val df = toDataFrame(Seq(bulkEdgeString))
+    val args = options.toCommand.grouped(2).map { kv =>
+      kv.head -> kv.last
+    }.toMap
+
+    val conf = TaskConf("test", "sql", Seq("input"), args)
+
+    val sink = new S2graphSink("testQuery", conf)
+    sink.write(df)
+
+    val dumpArgs = Map(
+      "hbase.rootdir" -> "",
+      "restore.path" -> "",
+      "hbase.table.names" -> Seq(options.tableName).mkString(","),
+      "hbase.table.cf" -> "e"
+    )
+    val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs)
+    val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike])
+    s2Edges.foreach { edge => println(edge) }
+  }
+}

Reply via email to