Repository: incubator-s2graph Updated Branches: refs/heads/master 1799ae456 -> 5a0e4d835
add skipError option to skip over not-serializable data. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/63dd6fa2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/63dd6fa2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/63dd6fa2 Branch: refs/heads/master Commit: 63dd6fa23803f1a76a86f8e53c6115c4dd15cbf9 Parents: 3332f6b Author: DO YUNG YOON <steams...@apache.org> Authored: Mon Apr 2 13:57:15 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Mon Apr 2 13:57:15 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/s2jobs/S2GraphHelper.scala | 28 +++--- .../s2jobs/loader/GraphFileOptions.scala | 14 +-- .../s2graph/s2jobs/loader/HFileGenerator.scala | 1 + .../s2jobs/loader/HFileMRGenerator.scala | 1 + .../loader/LocalBulkLoaderTransformer.scala | 61 ------------- .../loader/SparkBulkLoaderTransformer.scala | 76 ---------------- .../serde/LocalBulkLoaderTransformer.scala | 61 +++++++++++++ .../serde/SparkBulkLoaderTransformer.scala | 76 ++++++++++++++++ .../s2jobs/serde/writer/KeyValueWriter.scala | 5 +- .../s2jobs/loader/GraphFileGeneratorTest.scala | 95 ++++++++++---------- 10 files changed, 214 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index 3f80e8f..383f39f 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -24,6 +24,7 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.SKeyValue import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId} +import org.apache.s2graph.s2jobs.loader.GraphFileOptions import play.api.libs.json.Json import scala.concurrent.ExecutionContext @@ -54,8 +55,8 @@ object S2GraphHelper { } } - private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, createRelEdges: Boolean = true): Seq[SKeyValue] = { - val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) + private def insertBulkForLoaderAsync(s2: S2Graph, edge: S2Edge, option: GraphFileOptions): Seq[SKeyValue] = { + val relEdges = if (option.autoEdgeCreate) edge.relatedEdges else List(edge) val snapshotEdgeKeyValues = s2.getStorage(edge.toSnapshotEdge.label).serDe.snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues val indexEdgeKeyValues = relEdges.flatMap { edge => @@ -67,15 +68,20 @@ object S2GraphHelper { snapshotEdgeKeyValues ++ indexEdgeKeyValues } - def toSKeyValues(s2: S2Graph, element: GraphElement, autoEdgeCreate: Boolean = false): Seq[SKeyValue] = { - if (element.isInstanceOf[S2Edge]) { - val edge = element.asInstanceOf[S2Edge] - insertBulkForLoaderAsync(s2, edge, autoEdgeCreate) - } else if (element.isInstanceOf[S2Vertex]) { - val vertex = element.asInstanceOf[S2Vertex] - s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues - } else { - Nil + def toSKeyValues(s2: S2Graph, element: GraphElement, option: GraphFileOptions): Seq[SKeyValue] = { + try { + if (element.isInstanceOf[S2Edge]) { + val edge = element.asInstanceOf[S2Edge] + insertBulkForLoaderAsync(s2, edge, option) + } else if (element.isInstanceOf[S2Vertex]) { + val vertex = element.asInstanceOf[S2Vertex] + s2.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues + } else { + Nil + } + } catch { + case e: Exception => + if (option.skipError) Nil else throw e } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala index 3e3ffb9..e855a32 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/GraphFileOptions.scala @@ -50,12 +50,10 @@ object GraphFileOptions { c.copy(dbDriver = x)).text("jdbc driver class.") opt[Int]('h', "maxHFilePerRegionServer").action ( (x, c) => - c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer." - ) + c.copy(maxHFilePerRegionServer = x)).text("maximum number of HFile per RegionServer.") opt[Int]('n', "numRegions").action ( (x, c) => - c.copy(numRegions = x)).text("total numRegions(pre-split size) on table." - ) + c.copy(numRegions = x)).text("total numRegions(pre-split size) on table.") opt[String]('l', "labelMapping").action( (x, c) => c.copy(labelMapping = toLabelMapping(x)) ).text("mapping info to change the label from source (originalLabel:newLabel)") @@ -67,8 +65,11 @@ object GraphFileOptions { c.copy(autoEdgeCreate = x)).text("generate reverse edge automatically") opt[Boolean]('c', "incrementalLoad").action( (x, c) => - c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not." - ) + c.copy(incrementalLoad = x)).text("whether incremental bulkload which append data on existing table or not.") + + opt[Boolean]('s', "skipError").action ((x, c) => + c.copy(skipError = x)).text("whether skip error row.") + opt[String]('m', "method").action( (x, c) => c.copy(method = x)).text("run method. currently MR(default)/SPARK supported." ) @@ -124,6 +125,7 @@ case class GraphFileOptions(input: String = "", autoEdgeCreate: Boolean = false, buildDegree: Boolean = false, incrementalLoad: Boolean = false, + skipError: Boolean = false, compressionAlgorithm: String = "NONE", method: String = "SPARK") { def toConfigParams = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala index b4ac51f..8ace94a 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileGenerator.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, KeyValue, TableName} import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement +import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer import org.apache.s2graph.s2jobs.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala index 3502bee..fd78718 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat} import org.apache.hadoop.mapreduce.{Job, Mapper} +import org.apache.s2graph.s2jobs.serde.SparkBulkLoaderTransformer import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala deleted file mode 100644 index 7d405a6..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.s2jobs.loader - -import com.typesafe.config.Config -import org.apache.hadoop.hbase.KeyValue -import org.apache.s2graph.core.{GraphElement, S2Graph} -import org.apache.s2graph.s2jobs.serde.Transformer -import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader -import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter -import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} - -import scala.concurrent.ExecutionContext - -class LocalBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] { - val s2: S2Graph = S2GraphHelper.initS2Graph(config) - - override val reader = new TsvBulkFormatReader - override val writer = new KeyValueWriter - - override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_)) - - override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_)) - - override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = { - val degrees = elements.flatMap { element => - DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) - }.groupBy(_._1).mapValues(_.map(_._2).sum) - - degrees.toSeq.map { case (degreeKey, count) => - DegreeKey.toKeyValue(s2, degreeKey, count) - } - } - - override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = { - val elements = read(input) - val kvs = write(elements) - - val degrees = if (options.buildDegree) buildDegrees(elements) else Nil - - kvs ++ degrees - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala deleted file mode 100644 index cd991e1..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.s2jobs.loader - -import com.typesafe.config.Config -import org.apache.hadoop.hbase.{KeyValue => HKeyValue} -import org.apache.s2graph.core.GraphElement -import org.apache.s2graph.s2jobs.serde.Transformer -import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader -import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter -import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} -import org.apache.spark.rdd.RDD - -class SparkBulkLoaderTransformer(val config: Config, - val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] { - val reader = new TsvBulkFormatReader - - val writer = new KeyValueWriter - - override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.flatMap { line => - reader.read(s2)(line) - } - } - - override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.map(writer.write(s2)(_)) - } - - override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = { - val degrees = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.flatMap { element => - DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) - } - }.reduceByKey(_ + _) - - degrees.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) - - iter.map { case (degreeKey, count) => - DegreeKey.toKeyValue(s2, degreeKey, count) - } - } - } - - override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = { - val elements = read(input) - val kvs = write(elements) - - if (options.buildDegree) kvs ++ buildDegrees(elements) - kvs - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala new file mode 100644 index 0000000..a185754 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/LocalBulkLoaderTransformer.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.KeyValue +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.apache.s2graph.s2jobs.loader.GraphFileOptions +import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} + +import scala.concurrent.ExecutionContext + +class LocalBulkLoaderTransformer(val config: Config, + val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[String, Seq[KeyValue], Seq] { + val s2: S2Graph = S2GraphHelper.initS2Graph(config) + + override val reader = new TsvBulkFormatReader + override val writer = new KeyValueWriter(options) + + override def read(input: Seq[String]): Seq[GraphElement] = input.flatMap(reader.read(s2)(_)) + + override def write(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = elements.map(writer.write(s2)(_)) + + override def buildDegrees(elements: Seq[GraphElement]): Seq[Seq[KeyValue]] = { + val degrees = elements.flatMap { element => + DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + }.groupBy(_._1).mapValues(_.map(_._2).sum) + + degrees.toSeq.map { case (degreeKey, count) => + DegreeKey.toKeyValue(s2, degreeKey, count) + } + } + + override def transform(input: Seq[String]): Seq[Seq[KeyValue]] = { + val elements = read(input) + val kvs = write(elements) + + val degrees = if (options.buildDegree) buildDegrees(elements) else Nil + + kvs ++ degrees + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala new file mode 100644 index 0000000..63f4e2c --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/SparkBulkLoaderTransformer.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.serde + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.{KeyValue => HKeyValue} +import org.apache.s2graph.core.GraphElement +import org.apache.s2graph.s2jobs.loader.GraphFileOptions +import org.apache.s2graph.s2jobs.serde.reader.TsvBulkFormatReader +import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter +import org.apache.s2graph.s2jobs.{DegreeKey, S2GraphHelper} +import org.apache.spark.rdd.RDD + +class SparkBulkLoaderTransformer(val config: Config, + val options: GraphFileOptions) extends Transformer[String, Seq[HKeyValue], org.apache.spark.rdd.RDD] { + val reader = new TsvBulkFormatReader + + val writer = new KeyValueWriter(options) + + override def read(input: RDD[String]): RDD[GraphElement] = input.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap { line => + reader.read(s2)(line) + } + } + + override def write(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map(writer.write(s2)(_)) + } + + override def buildDegrees(elements: RDD[GraphElement]): RDD[Seq[HKeyValue]] = { + val degrees = elements.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.flatMap { element => + DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) + } + }.reduceByKey(_ + _) + + degrees.mapPartitions { iter => + val s2 = S2GraphHelper.initS2Graph(config) + + iter.map { case (degreeKey, count) => + DegreeKey.toKeyValue(s2, degreeKey, count) + } + } + } + + override def transform(input: RDD[String]): RDD[Seq[HKeyValue]] = { + val elements = read(input) + val kvs = write(elements) + + if (options.buildDegree) kvs ++ buildDegrees(elements) + kvs + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala index 02034af..22eee34 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/serde/writer/KeyValueWriter.scala @@ -22,11 +22,12 @@ package org.apache.s2graph.s2jobs.serde.writer import org.apache.hadoop.hbase.KeyValue import org.apache.s2graph.core.{GraphElement, S2Graph} import org.apache.s2graph.s2jobs.S2GraphHelper +import org.apache.s2graph.s2jobs.loader.GraphFileOptions import org.apache.s2graph.s2jobs.serde.GraphElementWritable -class KeyValueWriter(autoEdgeCreate: Boolean = false) extends GraphElementWritable[Seq[KeyValue]] { +class KeyValueWriter(option: GraphFileOptions) extends GraphElementWritable[Seq[KeyValue]] { override def write(s2: S2Graph)(element: GraphElement): Seq[KeyValue] = { - S2GraphHelper.toSKeyValues(s2, element, autoEdgeCreate).map { skv => + S2GraphHelper.toSKeyValues(s2, element, option).map { skv => new KeyValue(skv.row, skv.cf, skv.qualifier, skv.timestamp, skv.value) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/63dd6fa2/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala index 3fbbd88..3bd1a23 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/loader/GraphFileGeneratorTest.scala @@ -19,20 +19,18 @@ package org.apache.s2graph.s2jobs.loader -import org.apache.hadoop.hbase.HBaseConfiguration -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -import org.apache.hadoop.util.ToolRunner -import org.apache.s2graph.core.{PostProcess, S2VertexLike} +import org.apache.s2graph.core.PostProcess import org.apache.s2graph.core.storage.{CanSKeyValue, SKeyValue} import org.apache.s2graph.s2jobs.BaseSparkTest +import org.apache.s2graph.s2jobs.serde.{LocalBulkLoaderTransformer, SparkBulkLoaderTransformer} import play.api.libs.json.Json -import scala.io.Source - class GraphFileGeneratorTest extends BaseSparkTest { - import scala.concurrent.ExecutionContext.Implicits.global + import org.apache.hadoop.hbase.{KeyValue => HKeyValue} + import scala.concurrent.ExecutionContext.Implicits.global + def transformToSKeyValues(transformerMode: String, edges: Seq[String]): List[SKeyValue] = { transformerMode match { case "spark" => @@ -56,6 +54,7 @@ class GraphFileGeneratorTest extends BaseSparkTest { }.toList } } + test("test generateKeyValues edge only. SparkBulkLoaderTransformer") { val label = initTestEdgeSchema(s2, tableName, schemaVersion, compressionAlgorithm) /* end of initialize model */ @@ -121,7 +120,7 @@ class GraphFileGeneratorTest extends BaseSparkTest { println(Json.prettyPrint(jsValue)) } - bulkVertex shouldBe(vertex) + bulkVertex shouldBe (vertex) } test("test generateKeyValues vertex only. LocalBulkLoaderTransformer") { @@ -140,46 +139,46 @@ class GraphFileGeneratorTest extends BaseSparkTest { println(Json.prettyPrint(jsValue)) } - bulkVertex shouldBe(vertex) + bulkVertex shouldBe (vertex) } -// this test case expect options.input already exist with valid bulk load format. -// test("bulk load and fetch vertex: spark mode") { -// import scala.collection.JavaConverters._ -// val serviceColumn = initTestVertexSchema(s2) -// -// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq -// val input = sc.parallelize(bulkVertexLs) -// -// HFileGenerator.generate(sc, s2Config, input, options) -// -// val hfileArgs = Array(options.output, options.tableName) -// val hbaseConfig = HBaseConfiguration.create() -// -// val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) -// -// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) -// val json = PostProcess.verticesToJson(s2Vertices) -// -// println(Json.prettyPrint(json)) -// } - -// this test case expect options.input already exist with valid bulk load format. -// test("bulk load and fetch vertex: mr mode") { -// val serviceColumn = initTestVertexSchema(s2) -// -// val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq -// val input = sc.parallelize(bulkVertexLs) -// -// HFileMRGenerator.generate(sc, s2Config, input, options) -// -// val hfileArgs = Array(options.output, options.tableName) -// val hbaseConfig = HBaseConfiguration.create() -// -// val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) -// val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) -// val json = PostProcess.verticesToJson(s2Vertices) -// -// println(Json.prettyPrint(json)) -// } + // this test case expect options.input already exist with valid bulk load format. + // test("bulk load and fetch vertex: spark mode") { + // import scala.collection.JavaConverters._ + // val serviceColumn = initTestVertexSchema(s2) + // + // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq + // val input = sc.parallelize(bulkVertexLs) + // + // HFileGenerator.generate(sc, s2Config, input, options) + // + // val hfileArgs = Array(options.output, options.tableName) + // val hbaseConfig = HBaseConfiguration.create() + // + // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + // + // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) + // val json = PostProcess.verticesToJson(s2Vertices) + // + // println(Json.prettyPrint(json)) + // } + + // this test case expect options.input already exist with valid bulk load format. + // test("bulk load and fetch vertex: mr mode") { + // val serviceColumn = initTestVertexSchema(s2) + // + // val bulkVertexLs = Source.fromFile(options.input).getLines().toSeq + // val input = sc.parallelize(bulkVertexLs) + // + // HFileMRGenerator.generate(sc, s2Config, input, options) + // + // val hfileArgs = Array(options.output, options.tableName) + // val hbaseConfig = HBaseConfiguration.create() + // + // val ret = ToolRunner.run(hbaseConfig, new LoadIncrementalHFiles(hbaseConfig), hfileArgs) + // val s2Vertices = s2.vertices().asScala.toSeq.map(_.asInstanceOf[S2VertexLike]) + // val json = PostProcess.verticesToJson(s2Vertices) + // + // println(Json.prettyPrint(json)) + // } }