Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 5c85dd422 -> 9dc39ee37


reuse s2graph object


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6da382b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6da382b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6da382b9

Branch: refs/heads/master
Commit: 6da382b9dfee810df0f51270d7c03f62e72b1874
Parents: 6d231e2
Author: Chul Kang <[email protected]>
Authored: Wed Apr 18 16:37:25 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Wed Apr 18 16:37:25 2018 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala | 12 +++++++++---
 .../s2jobs/loader/LocalBulkLoaderTransformer.scala      |  2 +-
 .../s2jobs/loader/SparkBulkLoaderTransformer.scala      |  8 ++++----
 .../scala/org/apache/s2graph/s2jobs/task/Sink.scala     | 12 ++++--------
 .../s2graph/spark/sql/streaming/S2SinkContext.scala     |  2 ++
 .../spark/sql/streaming/S2StreamQueryWriter.scala       |  5 ++---
 .../scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala |  6 ++----
 7 files changed, 24 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index 6e68d28..b65af21 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -31,9 +31,15 @@ import play.api.libs.json.{JsObject, Json}
 import scala.concurrent.ExecutionContext
 import scala.util.Try
 
-object S2GraphHelper {
-  def initS2Graph(config: Config)(implicit ec: ExecutionContext = 
ExecutionContext.Implicits.global): S2Graph = {
-    new S2Graph(config)
+object S2GraphHelper extends Logger {
+  private var s2Graph:S2Graph = null
+
+  def getS2Graph(config: Config, init:Boolean = false)(implicit ec: 
ExecutionContext = ExecutionContext.Implicits.global): S2Graph = {
+    if (s2Graph == null || init) {
+      logger.info(s"S2Graph initialized..")
+      s2Graph = new S2Graph(config)
+    }
+    s2Graph
   }
 
   def buildDegreePutRequests(s2: S2Graph,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
index d3ed6bc..68f9a0e 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/LocalBulkLoaderTransformer.scala
@@ -29,7 +29,7 @@ import scala.reflect.ClassTag
 
 class LocalBulkLoaderTransformer(val config: Config,
                                  val options: GraphFileOptions)(implicit ec: 
ExecutionContext) extends Transformer[Seq] {
-  val s2: S2Graph = S2GraphHelper.initS2Graph(config)
+  val s2: S2Graph = S2GraphHelper.getS2Graph(config)
 
   override def buildDegrees[T: ClassTag](elements: Seq[GraphElement])(implicit 
writer: GraphElementWritable[T]): Seq[T] = {
     val degrees = elements.flatMap { element =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
index eec69b9..36b585e 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/SparkBulkLoaderTransformer.scala
@@ -32,7 +32,7 @@ class SparkBulkLoaderTransformer(val config: Config,
 
   override def buildDegrees[T: ClassTag](elements: RDD[GraphElement])(implicit 
writer: GraphElementWritable[T]): RDD[T] = {
     val degrees = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
+      val s2 = S2GraphHelper.getS2Graph(config)
 
       iter.flatMap { element =>
         DegreeKey.fromGraphElement(s2, element, options.labelMapping).map(_ -> 
1L)
@@ -40,7 +40,7 @@ class SparkBulkLoaderTransformer(val config: Config,
     }.reduceByKey(_ + _)
 
     degrees.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
+      val s2 = S2GraphHelper.getS2Graph(config)
 
       iter.map { case (degreeKey, count) =>
         writer.writeDegree(s2)(degreeKey, count)
@@ -50,7 +50,7 @@ class SparkBulkLoaderTransformer(val config: Config,
 
   override def transform[S: ClassTag, T: ClassTag](input: RDD[S])(implicit 
reader: GraphElementReadable[S], writer: GraphElementWritable[T]): RDD[T] = {
     val elements = input.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
+      val s2 = S2GraphHelper.getS2Graph(config)
 
       iter.flatMap { line =>
         reader.read(s2)(line)
@@ -58,7 +58,7 @@ class SparkBulkLoaderTransformer(val config: Config,
     }
 
     val kvs = elements.mapPartitions { iter =>
-      val s2 = S2GraphHelper.initS2Graph(config)
+      val s2 = S2GraphHelper.getS2Graph(config)
 
       iter.map(writer.write(s2)(_))
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 58fe2b5..6e483bb 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,20 +20,15 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
 import org.apache.s2graph.core.Management
 import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, 
SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, 
SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
-import scala.collection.mutable.ListBuffer
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -233,9 +228,10 @@ class S2graphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
   }
 
   private def writeBatchWithMutate(df:DataFrame):Unit = {
-    import scala.collection.JavaConversions._
     import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
 
+    import scala.collection.JavaConversions._
+
     val graphConfig: Config = 
ConfigFactory.parseMap(conf.options).withFallback(ConfigFactory.load())
     val serializedConfig = 
graphConfig.root().render(ConfigRenderOptions.concise())
 
@@ -246,7 +242,7 @@ class S2graphSink(queryName: String, conf: TaskConf) 
extends Sink(queryName, con
 
     df.foreachPartition { iters =>
       val config = ConfigFactory.parseString(serializedConfig)
-      val s2Graph = S2GraphHelper.initS2Graph(config)
+      val s2Graph = S2GraphHelper.getS2Graph(config)
 
       val responses = iters.grouped(groupedSize).flatMap { rows =>
         val elements = rows.flatMap(row => reader.read(s2Graph)(row))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
index 951d9ae..49581c6 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
@@ -24,6 +24,7 @@ import org.apache.s2graph.core.S2Graph
 
 import scala.concurrent.ExecutionContext
 
+@deprecated
 class S2SinkContext(config: Config)(implicit ec: ExecutionContext = 
ExecutionContext.Implicits.global){
   println(s">>>> S2SinkContext Created...")
   private lazy val s2Graph = new S2Graph(config)
@@ -32,6 +33,7 @@ class S2SinkContext(config: Config)(implicit ec: 
ExecutionContext = ExecutionCon
   }
 }
 
+@deprecated
 object S2SinkContext {
   private var s2SinkContext:S2SinkContext = null
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
index f6fecd7..2689f6e 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
@@ -41,7 +41,7 @@ private [sql] class S2StreamQueryWriter(
                                          commitProtocol: S2CommitProtocol
                                        ) extends Serializable with Logger {
   private val config = ConfigFactory.parseString(serializedConf)
-  private val s2SinkContext = S2SinkContext(config)
+  private val s2Graph = S2GraphHelper.getS2Graph(config)
   private val encoder: ExpressionEncoder[Row] = 
RowEncoder(schema).resolveAndBind()
   private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", 
"operation", "elem", "direction")
 
@@ -50,7 +50,6 @@ private [sql] class S2StreamQueryWriter(
     val taskId = s"stage-${taskContext.stageId()}, 
partition-${taskContext.partitionId()}, attempt-${taskContext.taskAttemptId()}"
     val partitionId= taskContext.partitionId()
 
-    val s2Graph = s2SinkContext.getGraph
     val groupedSize = getConfigString(config, S2_SINK_GROUPED_SIZE, 
DEFAULT_GROUPED_SIZE).toInt
     val waitTime = getConfigString(config, S2_SINK_WAIT_TIME, 
DEFAULT_WAIT_TIME_SECONDS).toInt
 
@@ -85,5 +84,5 @@ private [sql] class S2StreamQueryWriter(
   }
 
   private def rowToEdge(internalRow:InternalRow): Option[GraphElement] =
-    S2GraphHelper.sparkSqlRowToGraphElement(s2SinkContext.getGraph, 
encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
+    S2GraphHelper.sparkSqlRowToGraphElement(s2Graph, 
encoder.fromRow(internalRow), schema, RESERVED_COLUMN)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6da382b9/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 0461d1e..3fa8cea 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -24,10 +24,9 @@ import java.io.{File, PrintWriter}
 import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
-import org.apache.s2graph.core.{Management, S2Graph}
 import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.{Management, S2Graph}
 import org.apache.s2graph.s2jobs.loader.GraphFileOptions
-import org.apache.spark.{SparkConf, SparkContext}
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import scala.util.Try
@@ -65,7 +64,7 @@ class BaseSparkTest extends FunSuite with Matchers with 
BeforeAndAfterAll with D
     // initialize spark context.
     super.beforeAll()
 
-    s2 = S2GraphHelper.initS2Graph(s2Config)
+    s2 = S2GraphHelper.getS2Graph(s2Config)
     initTestDataFile
   }
 
@@ -103,7 +102,6 @@ class BaseSparkTest extends FunSuite with Matchers with 
BeforeAndAfterAll with D
   }
 
   def initTestVertexSchema(s2: S2Graph): ServiceColumn = {
-    import scala.collection.JavaConverters._
     /* initialize model for test */
     val management = s2.management
 

Reply via email to