Repository: incubator-s2graph Updated Branches: refs/heads/master 5c85dd422 -> 9dc39ee37
reuse s2graph object Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6da382b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6da382b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6da382b9 Branch: refs/heads/master Commit: 6da382b9dfee810df0f51270d7c03f62e72b1874 Parents: 6d231e2 Author: Chul Kang <[email protected]> Authored: Wed Apr 18 16:37:25 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Wed Apr 18 16:37:25 2018 +0900 ---------------------------------------------------------------------- .../scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala | 12 +++++++++--- .../s2jobs/loader/LocalBulkLoaderTransformer.scala | 2 +- .../s2jobs/loader/SparkBulkLoaderTransformer.scala | 8 ++++---- .../scala/org/apache/s2graph/s2jobs/task/Sink.scala | 12 ++++-------- .../s2graph/spark/sql/streaming/S2SinkContext.scala | 2 ++ .../spark/sql/streaming/S2StreamQueryWriter.scala | 5 ++--- .../scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala | 6 ++---- 7 files changed, 24 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala index 6e68d28..b65af21 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala @@ -31,9 +31,15 @@ import play.api.libs.json.{JsObject, Json} import scala.concurrent.ExecutionContext import scala.util.Try -object S2GraphHelper { - def initS2Graph(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = { - new S2Graph(config) +object S2GraphHelper extends Logger { + private var s2Graph:S2Graph = null + + def getS2Graph(config: Config, init:Boolean = false)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): S2Graph = { + if (s2Graph == null || init) { + logger.info(s"S2Graph initialized..") + s2Graph = new S2Graph(config) + } + s2Graph } def buildDegreePutRequests(s2: S2Graph, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala index d3ed6bc..68f9a0e 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala @@ -29,7 +29,7 @@ import scala.reflect.ClassTag class LocalBulkLoaderTransformer(val config: Config, val options: GraphFileOptions)(implicit ec: ExecutionContext) extends Transformer[Seq] { - val s2: S2Graph = S2GraphHelper.initS2Graph(config) + val s2: S2Graph = S2GraphHelper.getS2Graph(config) override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit writer: GraphElementWritable[T]): Seq[T] = { val degrees = elements.flatMap { element => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala index eec69b9..36b585e 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala @@ -32,7 +32,7 @@ class SparkBulkLoaderTransformer(val config: Config, override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit writer: GraphElementWritable[T]): RDD[T] = { val degrees = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) + val s2 = S2GraphHelper.getS2Graph(config) iter.flatMap { element => DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 1L) @@ -40,7 +40,7 @@ class SparkBulkLoaderTransformer(val config: Config, }.reduceByKey(_ + _) degrees.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) + val s2 = S2GraphHelper.getS2Graph(config) iter.map { case (degreeKey, count) => writer.writeDegree(s2)(degreeKey, count) @@ -50,7 +50,7 @@ class SparkBulkLoaderTransformer(val config: Config, override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = { val elements = input.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) + val s2 = S2GraphHelper.getS2Graph(config) iter.flatMap { line => reader.read(s2)(line) @@ -58,7 +58,7 @@ class SparkBulkLoaderTransformer(val config: Config, } val kvs = elements.mapPartitions { iter => - val s2 = S2GraphHelper.initS2Graph(config) + val s2 = S2GraphHelper.getS2Graph(config) iter.map(writer.write(s2)(_)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 58fe2b5..6e483bb 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -20,20 +20,15 @@ package org.apache.s2graph.s2jobs.task import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} -import org.apache.hadoop.hbase.HBaseConfiguration -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -import org.apache.hadoop.util.ToolRunner import org.apache.s2graph.core.Management import org.apache.s2graph.s2jobs.S2GraphHelper -import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer} +import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer} import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter -import org.apache.s2graph.spark.sql.streaming.S2SinkContext import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger} import org.elasticsearch.spark.sql.EsSparkSQL -import scala.collection.mutable.ListBuffer import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -233,9 +228,10 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con } private def writeBatchWithMutate(df:DataFrame):Unit = { - import scala.collection.JavaConversions._ import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ + import scala.collection.JavaConversions._ + val graphConfig: Config = ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load()) val serializedConfig = graphConfig.root().render(ConfigRenderOptions.concise()) @@ -246,7 +242,7 @@ class S2graphSink(queryName: String, conf: TaskConf) extends Sink(queryName, con df.foreachPartition { iters => val config = ConfigFactory.parseString(serializedConfig) - val s2Graph = S2GraphHelper.initS2Graph(config) + val s2Graph = S2GraphHelper.getS2Graph(config) val responses = iters.grouped(groupedSize).flatMap { rows => val elements = rows.flatMap(row => reader.read(s2Graph)(row)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala index 951d9ae..49581c6 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala @@ -24,6 +24,7 @@ import org.apache.s2graph.core.S2Graph import scala.concurrent.ExecutionContext +@deprecated class S2SinkContext(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global){ println(s">>>> S2SinkContext Created...") private lazy val s2Graph = new S2Graph(config) @@ -32,6 +33,7 @@ class S2SinkContext(config: Config)(implicit ec: ExecutionContext = ExecutionCon } } +@deprecated object S2SinkContext { private var s2SinkContext:S2SinkContext = null http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala index f6fecd7..2689f6e 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala @@ -41,7 +41,7 @@ private [sql] class S2StreamQueryWriter( commitProtocol: S2CommitProtocol ) extends Serializable with Logger { private val config = ConfigFactory.parseString(serializedConf) - private val s2SinkContext = S2SinkContext(config) + private val s2Graph = S2GraphHelper.getS2Graph(config) private val encoder: ExpressionEncoder[Row] = RowEncoder(schema).resolveAndBind() private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction") @@ -50,7 +50,6 @@ private [sql] class S2StreamQueryWriter( val taskId = s"stage-${taskContext.stageId()}, partition-${taskContext.partitionId()}, attempt-${taskContext.taskAttemptId()}" val partitionId= taskContext.partitionId() - val s2Graph = s2SinkContext.getGraph val groupedSize = getConfigString(config, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt val waitTime = getConfigString(config, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt @@ -85,5 +84,5 @@ private [sql] class S2StreamQueryWriter( } private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = - S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN) + S2GraphHelper.sparkSqlRowToGraphElement(s2Graph, encoder.fromRow(internalRow), schema, RESERVED_COLUMN) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala index 0461d1e..3fa8cea 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala @@ -24,10 +24,9 @@ import java.io.{File, PrintWriter} import com.holdenkarau.spark.testing.DataFrameSuiteBase import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} -import org.apache.s2graph.core.{Management, S2Graph} import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.core.{Management, S2Graph} import org.apache.s2graph.s2jobs.loader.GraphFileOptions -import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import scala.util.Try @@ -65,7 +64,7 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D // initialize spark context. super.beforeAll() - s2 = S2GraphHelper.initS2Graph(s2Config) + s2 = S2GraphHelper.getS2Graph(s2Config) initTestDataFile } @@ -103,7 +102,6 @@ class BaseSparkTest extends FunSuite with Matchers with BeforeAndAfterAll with D } def initTestVertexSchema(s2: S2Graph): ServiceColumn = { - import scala.collection.JavaConverters._ /* initialize model for test */ val management = s2.management
