Repository: incubator-s2graph Updated Branches: refs/heads/master a07c4d2d3 -> b21db657e
add S2GraphSource. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ef72257e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ef72257e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ef72257e Branch: refs/heads/master Commit: ef72257ee875cb9ba1774d568ae3bbf6c6e2e262 Parents: 1799ae4 Author: DO YUNG YOON <[email protected]> Authored: Wed Apr 4 13:49:03 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Wed Apr 4 13:49:03 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/S2VertexLike.scala | 6 +- .../org/apache/s2graph/s2jobs/DegreeKey.scala | 32 ++++++++- .../apache/s2graph/s2jobs/S2GraphHelper.scala | 23 ------- .../s2graph/s2jobs/loader/HFileGenerator.scala | 60 +++++++++++++++-- .../s2jobs/serde/reader/IdentityReader.scala | 9 +++ .../s2jobs/serde/reader/S2GraphCellReader.scala | 37 +++++++++++ .../s2jobs/serde/writer/DataFrameWriter.scala | 25 +++++++ .../s2jobs/serde/writer/IdentityWriter.scala | 13 ++++ .../org/apache/s2graph/s2jobs/task/Source.scala | 21 ++++++ .../apache/s2graph/s2jobs/task/SourceTest.scala | 69 ++++++++++++++++++++ 10 files changed, 260 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala index 5612525..2fbc4f1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala @@ -68,10 +68,8 @@ trait S2VertexLike extends Vertex with GraphElement { if (!id.storeColId) ("", "") else (serviceColumn.service.serviceName, serviceColumn.columnName) - if (propsWithName.nonEmpty) - Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t") - else - Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t") + + Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t") } def vertices(direction: Direction, edgeLabels: String*): util.Iterator[Vertex] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala index f1efac3..561c676 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala @@ -19,9 +19,12 @@ package org.apache.s2graph.s2jobs -import org.apache.s2graph.core.{GraphElement, S2Edge, S2Graph, S2Vertex} +import org.apache.s2graph.core._ import org.apache.s2graph.core.storage.SKeyValue import org.apache.hadoop.hbase.{KeyValue => HKeyValue} +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} +import play.api.libs.json.Json object DegreeKey { @@ -38,10 +41,35 @@ object DegreeKey { } } + def toEdge(s2: S2Graph, degreeKey: DegreeKey, count: Long): S2EdgeLike = { + val labelName = degreeKey.labelName + val direction = degreeKey.direction + val vertexId = degreeKey.vertexIdStr + val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) + val dir = GraphUtil.directions(direction) + val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { + throw new RuntimeException(s"$vertexId can not be converted into innerval") + } + val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal)) + + val ts = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion), + LabelMeta.degree -> InnerValLikeWithTs.withLong(count, ts, label.schemaVersion)) + + s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs) + } + def toSKeyValue(s2: S2Graph, degreeKey: DegreeKey, count: Long): Seq[SKeyValue] = { - S2GraphHelper.buildDegreePutRequests(s2, degreeKey.vertexIdStr, degreeKey.labelName, degreeKey.direction, count) + try { + val edge = toEdge(s2, degreeKey, count) + edge.edgesWithIndex.flatMap { indexEdge => + s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues + } + } catch { + case e: Exception => Nil + } } def toKeyValue(s2: S2Graph, degreeKey: DegreeKey, count: Long): Seq[HKeyValue] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index 6e68d28..845b343 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -21,9 +21,7 @@ package org.apache.s2graph.s2jobs import com.typesafe.config.Config import org.apache.s2graph.core._ -import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.SKeyValue -import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import play.api.libs.json.{JsObject, Json} @@ -36,27 +34,6 @@ object S2GraphHelper { new S2Graph(config) } - def buildDegreePutRequests(s2: S2Graph, - vertexId: String, - labelName: String, - direction: String, - degreeVal: Long): Seq[SKeyValue] = { - val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) - val dir = GraphUtil.directions(direction) - val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { - throw new RuntimeException(s"$vertexId can not be converted into innerval") - } - val vertex = s2.elementBuilder.newVertex(SourceVertexId(label.srcColumn, innerVal)) - - val ts = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val edge = s2.elementBuilder.newEdge(vertex, vertex, label, dir, propsWithTs = propsWithTs) - - edge.edgesWithIndex.flatMap { indexEdge => - s2.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues - } - } - private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = { val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index e7535d4..ee6b842 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -21,20 +21,27 @@ package org.apache.s2graph.s2jobs.loader import com.typesafe.config.Config import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.ConnectionFactory +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.client.{ConnectionFactory, Result, Scan} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.mapreduce.{LoadIncrementalHFiles, TableOutputFormat} +import org.apache.hadoop.hbase.mapreduce._ import org.apache.hadoop.hbase.regionserver.BloomType -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName} +import org.apache.hadoop.hbase.util.{Base64, Bytes} +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.protobuf.ProtobufUtil +import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.util.ToolRunner +import org.apache.s2graph.core.Management import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement -import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader -import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.serde.reader.{S2GraphCellReader, TsvBulkFormatReader} +import org.apache.s2graph.s2jobs.serde.writer.{DataFrameWriter, KeyValueWriter} import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession} object HFileGenerator extends RawFileGenerator[String, KeyValue] { @@ -131,5 +138,46 @@ object HFileGenerator extends RawFileGenerator[String, KeyValue] { val hbaseConfig = HBaseConfiguration.create() ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) } + + def tableSnapshotDump(ss: SparkSession, + config: Config, + options: GraphFileOptions, + snapshotPath: String, + restorePath: String, + tableNames: Seq[String], + columnFamily: String = "e", + batchSize: Int = 1000): DataFrame = { + import ss.sqlContext.implicits._ + + val cf = Bytes.toBytes(columnFamily) + + val hbaseConfig = HBaseConfiguration.create(ss.sparkContext.hadoopConfiguration) + hbaseConfig.set("hbase.rootdir", snapshotPath) + + val initial = ss.sparkContext.parallelize(Seq.empty[Seq[Cell]]) + val input = tableNames.foldLeft(initial) { case (prev, tableName) => + val scan = new Scan + scan.addFamily(cf) + scan.setBatch(batchSize) + TableSnapshotInputFormatImpl.setInput(hbaseConfig, tableName, new Path(restorePath)) + hbaseConfig.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray())) + + val job = Job.getInstance(hbaseConfig, "Decode index edge from " + tableName) + val current = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration, + classOf[TableSnapshotInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result]).map(_._2.listCells().asScala.toSeq) + + prev ++ current + } + + implicit val reader = new S2GraphCellReader + implicit val writer = new DataFrameWriter + + val transformer = new SparkBulkLoaderTransformer(config, options) + val kvs = transformer.transform(input) + + kvs.toDF(DataFrameWriter.Fields: _*) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala new file mode 100644 index 0000000..b4d1eb2 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/IdentityReader.scala @@ -0,0 +1,9 @@ +package org.apache.s2graph.s2jobs.serde.reader + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.serde.GraphElementReadable + +class IdentityReader extends GraphElementReadable[GraphElement] { + override def read(graph: S2Graph)(data: GraphElement): Option[GraphElement] = + Option(data) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala new file mode 100644 index 0000000..868f0f2 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/reader/S2GraphCellReader.scala @@ -0,0 +1,37 @@ +package org.apache.s2graph.s2jobs.serde.reader + +import org.apache.hadoop.hbase.Cell +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.storage.SKeyValue +import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.serde.GraphElementReadable + +class S2GraphCellReader extends GraphElementReadable[Seq[Cell]]{ + override def read(s2: S2Graph)(cells: Seq[Cell]): Option[GraphElement] = { + if (cells.isEmpty) None + else { + //TODO: + val cell = cells.head + val schemaVer = HBaseType.DEFAULT_VERSION + val cf = cell.getFamily + + val kvs = cells.map { cell => + new SKeyValue(Array.empty[Byte], cell.getRow, cell.getFamily, cell.getQualifier, + cell.getValue, cell.getTimestamp, SKeyValue.Default) + } + + if (Bytes.equals(cf, SKeyValue.VertexCf)) { + s2.defaultStorage.serDe.vertexDeserializer(schemaVer).fromKeyValues(kvs, None).map(_.asInstanceOf[GraphElement]) + } else if (Bytes.equals(cf, SKeyValue.EdgeCf)) { + val indexEdgeOpt = s2.defaultStorage.serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(kvs, None) + if (indexEdgeOpt.isDefined) indexEdgeOpt.map(_.asInstanceOf[GraphElement]) + else { + val snapshotEdgeOpt = s2.defaultStorage.serDe.snapshotEdgeDeserializer(schemaVer).fromKeyValues(kvs, None) + if (snapshotEdgeOpt.isDefined) snapshotEdgeOpt.map(_.toEdge.asInstanceOf[GraphElement]) + else throw new IllegalStateException(s"column family indicate this is edge, but neither snapshot/index edge.") + } + } else throw new IllegalStateException(s"wrong column family. ${Bytes.toString(cf)}") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala new file mode 100644 index 0000000..458821e --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/DataFrameWriter.scala @@ -0,0 +1,25 @@ +package org.apache.s2graph.s2jobs.serde.writer + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.DegreeKey +import org.apache.s2graph.s2jobs.serde.GraphElementWritable + +object DataFrameWriter { + type GraphElementTuple = (String, String, String, String, String, String, String, String) + val Fields = Seq("timestamp", "operation", "element", "from", "to", "label", "props", "direction") +} + +class DataFrameWriter extends GraphElementWritable[DataFrameWriter.GraphElementTuple]{ + override def write(s2: S2Graph)(element: GraphElement): (String, String, String, String, String, String, String, String) = { + val Array(ts, op, elem, from, to, label, props, dir) = element.toLogString().split("\t") + + (ts, op, elem, from, to, label, props, dir) + } + + override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): (String, String, String, String, String, String, String, String) = { + val element = DegreeKey.toEdge(s2, degreeKey, count) + val Array(ts, op, elem, from, to, label, props, dir) = element.toLogString().split("\t") + + (ts, op, elem, from, to, label, props, dir) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala new file mode 100644 index 0000000..32bab6a --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/IdentityWriter.scala @@ -0,0 +1,13 @@ +package org.apache.s2graph.s2jobs.serde.writer + +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.DegreeKey +import org.apache.s2graph.s2jobs.serde.GraphElementWritable + +class IdentityWriter extends GraphElementWritable[GraphElement]{ + override def write(s2: S2Graph)(element: GraphElement): GraphElement = element + + override def writeDegree(s2: S2Graph)(degreeKey: DegreeKey, count: Long): GraphElement = + DegreeKey.toEdge(s2, degreeKey, count) + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala index d2ca8ef..9ac9296 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Source.scala @@ -19,6 +19,9 @@ package org.apache.s2graph.s2jobs.task +import org.apache.s2graph.core.Management +import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.loader.HFileGenerator import org.apache.spark.sql.{DataFrame, SparkSession} @@ -102,3 +105,21 @@ class HiveSource(conf:TaskConf) extends Source(conf) { ss.sql(sql) } } + +class HBaseTableSnapshotSource(conf: TaskConf) extends Source(conf) { + + override def mandatoryOptions: Set[String] = Set("hbase.rootdir", "restore.path", "hbase.table.names") + + override def toDF(ss: SparkSession): DataFrame = { + val options = TaskConf.toGraphFileOptions(conf) + val config = Management.toConfig(options.toConfigParams) + + val snapshotPath = conf.options("hbase.rootdir") + val restorePath = conf.options("restore.path") + val tableNames = conf.options("hbase.table.names").split(",") + val columnFamily = conf.options.getOrElse("hbase.table.cf", "e") + val batchSize = conf.options.getOrElse("scan.batch.size", "1000").toInt + + HFileGenerator.tableSnapshotDump(ss, config, options, snapshotPath, restorePath, tableNames, columnFamily, batchSize) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ef72257e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala new file mode 100644 index 0000000..86cdf2b --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/SourceTest.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.task + +import org.apache.s2graph.core.S2EdgeLike +import org.apache.s2graph.s2jobs.BaseSparkTest +import org.apache.spark.sql.DataFrame + +import scala.collection.JavaConverters._ + +class SourceTest extends BaseSparkTest { + def toDataFrame(edges: Seq[String]): DataFrame = { + import spark.sqlContext.implicits._ + val elements = edges.flatMap(s2.elementBuilder.toEdge(_)) + + elements.map { e => + (e.getTs(), + e.getOperation(), + "e", + e.srcVertex.innerIdVal.toString, + e.tgtVertex.innerIdVal.toString, + e.label(), + "{}", + e.getDirection()) + }.toDF("timestamp", "operation", "element", "from", "to", "label", "props", "direction") + } + + test("S2graphSink writeBatch.") { + val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) + + val bulkEdgeString = "1416236400000\tinsert\tedge\ta\tb\tfriends\t{\"since\":1316236400000,\"score\":10}" + val df = toDataFrame(Seq(bulkEdgeString)) + val args = options.toCommand.grouped(2).map { kv => + kv.head -> kv.last + }.toMap + + val conf = TaskConf("test", "sql", Seq("input"), args) + + val sink = new S2graphSink("testQuery", conf) + sink.write(df) + + val dumpArgs = Map( + "hbase.rootdir" -> "", + "restore.path" -> "", + "hbase.table.names" -> Seq(options.tableName).mkString(","), + "hbase.table.cf" -> "e" + ) + val dumpConf = TaskConf("dump", "sql", Seq("input"), dumpArgs) + val s2Edges = s2.edges().asScala.toSeq.map(_.asInstanceOf[S2EdgeLike]) + s2Edges.foreach { edge => println(edge) } + } +}
