Repository: incubator-s2graph
Updated Branches:
  refs/heads/master b58ba20b1 -> d37888ff2


[S2GRAPH-127] Refactor ExceptionHander Object into Class.
- To publish all incoming requests(WALLOG) into kafka topic.
- To publish failed mutations when Storage failed to mutate into backend 
storage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6b5918b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6b5918b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6b5918b1

Branch: refs/heads/master
Commit: 6b5918b1e21765164356ffb88f4b331a1257691a
Parents: 1c2bd04
Author: daewon <[email protected]>
Authored: Thu Apr 21 11:48:28 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu Nov 10 18:05:53 2016 +0900

----------------------------------------------------------------------
 .../apache/s2graph/core/ExceptionHandler.scala  | 161 ++++++-------------
 .../apache/s2graph/core/storage/Storage.scala   |  16 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |   1 +
 .../apache/s2graph/rest/play/Bootstrap.scala    |  35 ++--
 .../s2graph/rest/play/actors/QueueActor.scala   |  10 +-
 .../controllers/ApplicationController.scala     |   9 ++
 .../play/controllers/CounterController.scala    |   3 +-
 .../rest/play/controllers/EdgeController.scala  |  21 +--
 .../play/controllers/PublishController.scala    |   3 +-
 .../play/controllers/VertexController.scala     |   7 +-
 10 files changed, 108 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index 29fc2dd..d03d483 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -20,75 +20,76 @@
 package org.apache.s2graph.core
 
 import java.util.Properties
-import java.util.concurrent.atomic.AtomicLong
 
-import akka.actor._
-import akka.routing.{Broadcast, RoundRobinPool}
 import com.typesafe.config.Config
 import org.apache.kafka.clients.producer._
+import org.apache.s2graph.core.utils.logger
 
-import scala.concurrent.duration._
+class ExceptionHandler(config: Config) {
+  import ExceptionHandler._
 
-object ExceptionHandler {
+  val keyBrokerList = "kafka.metadata.broker.list"
+  val phase = if (config.hasPath("phase")) config.getString("phase") else "dev"
+  val useKafka = config.hasPath(keyBrokerList) && 
config.getString(keyBrokerList) != "localhost"
 
-  var producer: Option[Producer[Key, Val]] = None
-  var properties: Option[Properties] = None
-  val numOfRoutees = 1
-  val actorSystem = ActorSystem("ExceptionHandler")
-  var routees: Option[ActorRef] = None
-  var shutdownTime = 1000 millis
-  var phase = "dev"
-  lazy val failTopic = s"mutateFailed_${phase}"
-
-  def apply(config: Config) = {
-    properties =
-      if (config.hasPath("kafka.metadata.broker.list")) 
Option(kafkaConfig(config))
-      else None
-    phase = if (config.hasPath("phase")) config.getString("phase") else "dev"
-    producer = for {
-      props <- properties
-      p <- try {
-        Option(new KafkaProducer[Key, Val](props))
+  val producer: Option[KafkaProducer[Key, Val]] =
+    if (useKafka) {
+      try {
+        Option(new KafkaProducer[Key, Val](toKafkaProp(config)))
       } catch {
-        case e: Throwable => None
-      }
-    } yield {
-        p
+        case e: Exception =>
+          logger.error(s"Initialize kafka fail with: ${toKafkaProp(config)}")
+          None
       }
-    init()
-  }
+    } else None
 
-  def props(producer: Producer[Key, Val]) = 
Props(classOf[KafkaAggregatorActor], producer)
 
-  def init() = {
-    for {
-      p <- producer
-    } {
-      routees = 
Option(actorSystem.actorOf(RoundRobinPool(numOfRoutees).props(props(p))))
+  def enqueue(m: KafkaMessage): Unit = {
+    producer match {
+      case None => logger.debug(s"skip log to Kafka: ${m}")
+      case Some(kafka) =>
+        kafka.send(m.msg, new Callback() {
+          override def onCompletion(meta: RecordMetadata, e: Exception) = {
+            if (e == null) {
+              // success
+            } else {
+              logger.error(s"log publish failed: ${m}", e)
+              // failure
+            }
+          }
+        })
     }
   }
 
-  def shutdown() = {
-    routees.map(_ ! Broadcast(PoisonPill))
-    Thread.sleep(shutdownTime.length)
-  }
+  def shutdown() = producer.foreach(_.close)
+}
 
-  def enqueues(msgs: Seq[KafkaMessage]) = {
-    msgs.foreach(enqueue)
+object ExceptionHandler {
+  type Key = String
+  type Val = String
+
+  def toKafkaMessage(topic: String, element: GraphElement, originalString: 
Option[String] = None) = {
+    KafkaMessage(
+      new ProducerRecord[Key, Val](
+        topic,
+        element.queuePartitionKey,
+        originalString.getOrElse(element.toLogString())))
   }
 
-  def enqueue(msg: KafkaMessage) = {
-    routees.map(_ ! msg)
+  def toKafkaMessage(topic: String, tsv: String) = {
+    KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv))
   }
 
+  case class KafkaMessage(msg: ProducerRecord[Key, Val])
 
-  def kafkaConfig(config: Config) = {
-    val props = new Properties();
+  private def toKafkaProp(config: Config) = {
+    val props = new Properties()
 
     /* all default configuration for new producer */
     val brokers =
       if (config.hasPath("kafka.metadata.broker.list")) 
config.getString("kafka.metadata.broker.list")
       else "localhost"
+
     props.put("bootstrap.servers", brokers)
     props.put("acks", "1")
     props.put("buffer.memory", "33554432")
@@ -103,76 +104,8 @@ object ExceptionHandler {
     props.put("block.on.buffer.full", "false")
     props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
     props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
-    props
-  }
-
-  type Key = String
-  type Val = String
-
-  def toKafkaMessage(topic: String = failTopic, element: GraphElement, 
originalString: Option[String] = None) = {
-    KafkaMessage(new ProducerRecord[Key, Val](topic, element.queuePartitionKey,
-      originalString.getOrElse(element.toLogString())))
-  }
-
-  case class KafkaMessage(msg: ProducerRecord[Key, Val])
-
-  case class Message(topic: String, msg: String)
-
-  case class BufferedKafkaMessage(msgs: Seq[ProducerRecord[Key, Val]], 
bufferSize: Int)
-
-  case class BufferedMessage(topic: String, bufferedMsgs: String, bufferSize: 
Int)
 
-  case object FlushBuffer
-
-  case class UpdateHealth(isHealty: Boolean)
-
-  case object ShowMetrics
-
-}
-
-class KafkaAggregatorActor(kafkaProducer: Producer[String, String]) extends 
Stash with ActorLogging {
-
-  import ExceptionHandler._
-
-  val failedCount = new AtomicLong(0L)
-  val successCount = new AtomicLong(0L)
-  val stashCount = new AtomicLong(0L)
-
-  implicit val ex = context.system.dispatcher
-
-  context.system.scheduler.schedule(0 millis, 10 seconds) {
-    self ! ShowMetrics
+    props
   }
 
-  override def receive = {
-    case ShowMetrics =>
-      log.info(s"[Stats]: failed[${failedCount.get}], 
stashed[${stashCount.get}], success[${successCount.get}]")
-
-    case m: KafkaMessage =>
-      val replayTo = self
-      try {
-        kafkaProducer.send(m.msg, new Callback() {
-          override def onCompletion(meta: RecordMetadata, e: Exception) = {
-            if (e == null) {
-              // success
-              successCount.incrementAndGet()
-              unstashAll()
-              stashCount.set(0L)
-            } else {
-              // failure
-              log.error(s"onCompletion: $e", e)
-              failedCount.incrementAndGet()
-              replayTo ! m
-            }
-          }
-        })
-      } catch {
-        case e@(_: org.apache.kafka.clients.producer.BufferExhaustedException 
| _: Throwable) =>
-          log.error(s"$e", e)
-          log.info(s"stash")
-          stash()
-          stashCount.incrementAndGet()
-      }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 70e47a7..341f405 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -57,6 +57,9 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
   /** retry scheduler */
   val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
 
+  /** handle mutate failed */
+  val exceptionHandler = new ExceptionHandler(config)
+
   val failTopic = s"mutateFailed_${config.getString("phase")}"
 
   /**
@@ -247,7 +250,9 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
    * this method need to be called when client shutdown. this is responsible 
to cleanUp the resources
    * such as client into storage.
    */
-  def flush(): Unit
+  def flush(): Unit = {
+    exceptionHandler.shutdown()
+  }
 
   /**
    * create table on storage.
@@ -339,6 +344,7 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
       strong ++ weak
     }
   }
+
   def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
 
     val grouped = _edges.groupBy { edge => (edge.label, 
edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq
@@ -429,7 +435,7 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
         logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
 
         val kafkaMessage = ExceptionHandler.toKafkaMessage(failTopic, element 
= edge)
-        ExceptionHandler.enqueue(kafkaMessage)
+        exceptionHandler.enqueue(kafkaMessage)
       }
 
       Future.successful(false)
@@ -946,12 +952,10 @@ abstract class Storage[R](val config: Config)(implicit 
ec: ExecutionContext) {
         label <- labels
       } yield {
           val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
GraphUtil.fromOp(dir.toByte)).mkString("\t")
-          val topic = ExceptionHandler.failTopic
-          val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, 
null, tsv))
-          kafkaMsg
+          ExceptionHandler.toKafkaMessage(failTopic, tsv)
         }
 
-      ExceptionHandler.enqueues(kafkaMessages)
+      kafkaMessages.foreach(exceptionHandler.enqueue)
     }
 
     val requestTs = ts

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 783fd8a..7391259 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -365,6 +365,7 @@ class AsynchbaseStorage(override val config: 
Config)(implicit ec: ExecutionConte
 
 
   override def flush(): Unit = clients.foreach { client =>
+    super.flush()
     val timeout = Duration((clientFlushInterval + 10) * 20, 
duration.MILLISECONDS)
     Await.result(client.flush().toFuture, timeout)
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
index 570c23c..30a5ee4 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -40,11 +40,9 @@ object Global extends WithFilters(new GzipFilter()) {
   var storageManagement: Management = _
   var s2parser: RequestParser = _
   var s2rest: RestHandler = _
+  var wallLogHandler: ExceptionHandler = _
 
-  // Application entry point
-  override def onStart(app: Application) {
-    ApplicationController.isHealthy = false
-
+  def startup() = {
     val numOfThread = Runtime.getRuntime.availableProcessors()
     val threadPool = Executors.newFixedThreadPool(numOfThread)
     val ec = ExecutionContext.fromExecutor(threadPool)
@@ -57,30 +55,38 @@ object Global extends WithFilters(new GzipFilter()) {
     s2parser = new RequestParser(s2graph.config) // merged config
     s2rest = new RestHandler(s2graph)(ec)
 
-    QueueActor.init(s2graph)
+    logger.info(s"starts with num of thread: $numOfThread, 
${threadPool.getClass.getSimpleName}")
+
+    config
+  }
 
-    if (Config.IS_WRITE_SERVER) {
-      ExceptionHandler.apply(config)
-    }
+  def shutdown() = {
+    s2graph.shutdown()
+  }
+
+  // Application entry point
+  override def onStart(app: Application) {
+    ApplicationController.isHealthy = false
+
+    val config = startup()
+    wallLogHandler = new ExceptionHandler(config)
+
+    QueueActor.init(s2graph, wallLogHandler)
 
     val defaultHealthOn = 
Config.conf.getBoolean("app.health.on").getOrElse(true)
     ApplicationController.deployInfo = 
Try(Source.fromFile("./release_info").mkString("")).recover { case _ => 
"release info not found\n" }.get
 
     ApplicationController.isHealthy = defaultHealthOn
-    logger.info(s"starts with num of thread: $numOfThread, 
${threadPool.getClass.getSimpleName}")
   }
 
   override def onStop(app: Application) {
+    wallLogHandler.shutdown()
     QueueActor.shutdown()
 
-    if (Config.IS_WRITE_SERVER) {
-      ExceptionHandler.shutdown()
-    }
-
     /*
      * shutdown hbase client for flush buffers.
      */
-    s2graph.shutdown()
+    shutdown()
   }
 
   override def onError(request: RequestHeader, ex: Throwable): Future[Result] 
= {
@@ -98,3 +104,4 @@ object Global extends WithFilters(new GzipFilter()) {
     Future.successful(Results.BadRequest(error))
   }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
index 89931dc..d46d8d2 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
@@ -46,8 +46,8 @@ object QueueActor {
   var router: ActorRef = _
 
   //    Akka.system.actorOf(props(), name = "queueActor")
-  def init(s2: Graph) = {
-    router = Akka.system.actorOf(props(s2))
+  def init(s2: Graph, walLogHandler: ExceptionHandler) = {
+    router = Akka.system.actorOf(props(s2, walLogHandler))
   }
 
   def shutdown() = {
@@ -56,10 +56,10 @@ object QueueActor {
     Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2)
   }
 
-  def props(s2: Graph): Props = Props(classOf[QueueActor], s2)
+  def props(s2: Graph, walLogHandler: ExceptionHandler): Props = 
Props(classOf[QueueActor], s2, walLogHandler)
 }
 
-class QueueActor(s2: Graph) extends Actor with ActorLogging {
+class QueueActor(s2: Graph, walLogHandler: ExceptionHandler) extends Actor 
with ActorLogging {
 
   import Protocol._
 
@@ -79,7 +79,7 @@ class QueueActor(s2: Graph) extends Actor with ActorLogging {
     case element: GraphElement =>
 
       if (queueSize > maxQueueSize) {
-        ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, 
element, None))
+        walLogHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, element, 
None))
       } else {
         queueSize += 1L
         queue.enqueue(element)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
index 707b98c..b328c85 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -23,6 +23,7 @@ import akka.util.ByteString
 import org.apache.s2graph.core.GraphExceptions.BadQueryException
 import org.apache.s2graph.core.PostProcess
 import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.rest.play.config.Config
 import play.api.http.HttpEntity
 import play.api.libs.iteratee.Enumerator
 import play.api.libs.json.{JsString, JsValue}
@@ -33,6 +34,7 @@ import scala.concurrent.{ExecutionContext, Future}
 object ApplicationController extends Controller {
 
   var isHealthy = true
+  var isWriteFallbackHealthy= true
   var deployInfo = ""
   val applicationJsonHeader = "application/json"
 
@@ -65,6 +67,13 @@ object ApplicationController extends Controller {
     else NotFound
   }
 
+  def toKafkaTopic(isAsync: Boolean) = {
+    if (!isWriteFallbackHealthy) Config.KAFKA_FAIL_TOPIC
+    else {
+      if (isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC
+    }
+  }
+
   def jsonResponse(json: JsValue, headers: (String, String)*) =
     if (ApplicationController.isHealthy) {
       Ok(json).as(applicationJsonHeader).withHeaders(headers: _*)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
index 96f7e60..53f3fce 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
@@ -47,6 +47,7 @@ object CounterController extends Controller {
   val config = Play.current.configuration.underlying
   val s2config = new S2CounterConfig(config)
 
+  private lazy val walLogHandler: ExceptionHandler = 
org.apache.s2graph.rest.play.Global.wallLogHandler
   private val exactCounterMap = Map(
     counter.VERSION_1 -> new ExactCounter(config, new 
ExactStorageAsyncHBase(config)),
     counter.VERSION_2 -> new ExactCounter(config, new 
ExactStorageGraph(config))
@@ -739,7 +740,7 @@ object CounterController extends Controller {
 
           // produce to kafka
           // hash partitioner by key
-          ExceptionHandler.enqueue(KafkaMessage(new 
Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg)))
+          walLogHandler.enqueue(KafkaMessage(new 
Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg)))
 
           Ok(Json.toJson(
             Map(

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index ec8324c..5b2ef97 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -19,7 +19,6 @@
 
 package org.apache.s2graph.rest.play.controllers
 
-import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.Label
 import org.apache.s2graph.core.rest.RequestParser
@@ -40,6 +39,7 @@ object EdgeController extends Controller {
 
   private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
   private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
+  private val walLogHandler: ExceptionHandler = 
org.apache.s2graph.rest.play.Global.wallLogHandler
 
   private def jsToStr(js: JsValue): String = js match {
     case JsString(s) => s
@@ -69,10 +69,9 @@ object EdgeController extends Controller {
         val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation)
 
         for ((edge, orgJs) <- edges.zip(jsOrgs)) {
-          if (edge.isAsync)
-            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, 
Option(toTsv(orgJs, operation))))
-          else
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
edge, Option(toTsv(orgJs, operation))))
+          val kafkaTopic = toKafkaTopic(edge.isAsync)
+          val kafkaMessage = ExceptionHandler.toKafkaMessage(kafkaTopic, edge, 
Option(toTsv(orgJs, operation)))
+          walLogHandler.enqueue(kafkaMessage)
         }
 
         val edgesToStore = edges.filterNot(e => e.isAsync)
@@ -108,11 +107,8 @@ object EdgeController extends Controller {
             case v: Vertex => vertexCnt += 1
             case e: Edge => edgeCnt += 1
           }
-          if (element.isAsync) {
-            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, 
Some(str)))
-          } else {
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
element, Some(str)))
-          }
+          val kafkaTopic = toKafkaTopic(element.isAsync)
+          walLogHandler.enqueue(toKafkaMessage(kafkaTopic, element, Some(str)))
           element
         }
 
@@ -208,11 +204,10 @@ object EdgeController extends Controller {
           if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else 
Config.KAFKA_LOG_TOPIC
         }
 
-        val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, 
tsv))
-        kafkaMsg
+        ExceptionHandler.toKafkaMessage(topic, tsv)
       }
 
-      ExceptionHandler.enqueues(kafkaMessages)
+      kafkaMessages.foreach(walLogHandler.enqueue)
     }
 
     def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], 
ts: Long, vertices: Seq[Vertex]) = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
index 72f67c4..1a037ae 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
@@ -37,6 +37,7 @@ object PublishController extends Controller {
    */
   val serviceNotExistException = new RuntimeException(s"service is not created 
in s2graph. create service first.")
 
+  private val walLogHandler: ExceptionHandler = 
org.apache.s2graph.rest.play.Global.wallLogHandler
   //  private def toService(topic: String): String = {
   //    Service.findByName(topic).map(service => 
s"${service.serviceName}-${Config.PHASE}").getOrElse(throw 
serviceNotExistException)
   //  }
@@ -48,7 +49,7 @@ object PublishController extends Controller {
       val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, 
str)
       //    val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, 
s"$str")
       //        logger.debug(s"$kafkaTopic, $str")
-      ExceptionHandler.enqueue(KafkaMessage(keyedMessage))
+      walLogHandler.enqueue(KafkaMessage(keyedMessage))
     })
     Future.successful(
       Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", 
"Keep-Alive" -> "timeout=10, max=10")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
index 2b15b5f..0fdbe43 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
@@ -32,6 +32,7 @@ import scala.concurrent.Future
 object VertexController extends Controller {
   private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
   private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
+  private val walLogHandler: ExceptionHandler = 
org.apache.s2graph.rest.play.Global.wallLogHandler
 
   import ApplicationController._
   import ExceptionHandler._
@@ -44,10 +45,8 @@ object VertexController extends Controller {
         val vertices = requestParser.toVertices(jsValue, operation, 
serviceNameOpt, columnNameOpt)
 
         for (vertex <- vertices) {
-          if (vertex.isAsync)
-            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, 
None))
-          else
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
vertex, None))
+          val kafkaTopic = toKafkaTopic(vertex.isAsync)
+          walLogHandler.enqueue(toKafkaMessage(kafkaTopic, vertex, None))
         }
 
         //FIXME:

Reply via email to