Repository: incubator-s2graph Updated Branches: refs/heads/master b58ba20b1 -> d37888ff2
[S2GRAPH-127] Refactor ExceptionHander Object into Class. - To publish all incoming requests(WALLOG) into kafka topic. - To publish failed mutations when Storage failed to mutate into backend storage. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6b5918b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6b5918b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6b5918b1 Branch: refs/heads/master Commit: 6b5918b1e21765164356ffb88f4b331a1257691a Parents: 1c2bd04 Author: daewon <[email protected]> Authored: Thu Apr 21 11:48:28 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu Nov 10 18:05:53 2016 +0900 ---------------------------------------------------------------------- .../apache/s2graph/core/ExceptionHandler.scala | 161 ++++++------------- .../apache/s2graph/core/storage/Storage.scala | 16 +- .../core/storage/hbase/AsynchbaseStorage.scala | 1 + .../apache/s2graph/rest/play/Bootstrap.scala | 35 ++-- .../s2graph/rest/play/actors/QueueActor.scala | 10 +- .../controllers/ApplicationController.scala | 9 ++ .../play/controllers/CounterController.scala | 3 +- .../rest/play/controllers/EdgeController.scala | 21 +-- .../play/controllers/PublishController.scala | 3 +- .../play/controllers/VertexController.scala | 7 +- 10 files changed, 108 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala index 29fc2dd..d03d483 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala @@ -20,75 +20,76 @@ package org.apache.s2graph.core import java.util.Properties -import java.util.concurrent.atomic.AtomicLong -import akka.actor._ -import akka.routing.{Broadcast, RoundRobinPool} import com.typesafe.config.Config import org.apache.kafka.clients.producer._ +import org.apache.s2graph.core.utils.logger -import scala.concurrent.duration._ +class ExceptionHandler(config: Config) { + import ExceptionHandler._ -object ExceptionHandler { + val keyBrokerList = "kafka.metadata.broker.list" + val phase = if (config.hasPath("phase")) config.getString("phase") else "dev" + val useKafka = config.hasPath(keyBrokerList) && config.getString(keyBrokerList) != "localhost" - var producer: Option[Producer[Key, Val]] = None - var properties: Option[Properties] = None - val numOfRoutees = 1 - val actorSystem = ActorSystem("ExceptionHandler") - var routees: Option[ActorRef] = None - var shutdownTime = 1000 millis - var phase = "dev" - lazy val failTopic = s"mutateFailed_${phase}" - - def apply(config: Config) = { - properties = - if (config.hasPath("kafka.metadata.broker.list")) Option(kafkaConfig(config)) - else None - phase = if (config.hasPath("phase")) config.getString("phase") else "dev" - producer = for { - props <- properties - p <- try { - Option(new KafkaProducer[Key, Val](props)) + val producer: Option[KafkaProducer[Key, Val]] = + if (useKafka) { + try { + Option(new KafkaProducer[Key, Val](toKafkaProp(config))) } catch { - case e: Throwable => None - } - } yield { - p + case e: Exception => + logger.error(s"Initialize kafka fail with: ${toKafkaProp(config)}") + None } - init() - } + } else None - def props(producer: Producer[Key, Val]) = Props(classOf[KafkaAggregatorActor], producer) - def init() = { - for { - p <- producer - } { - routees = Option(actorSystem.actorOf(RoundRobinPool(numOfRoutees).props(props(p)))) + def enqueue(m: KafkaMessage): Unit = { + producer match { + case None => logger.debug(s"skip log to Kafka: ${m}") + case Some(kafka) => + kafka.send(m.msg, new Callback() { + override def onCompletion(meta: RecordMetadata, e: Exception) = { + if (e == null) { + // success + } else { + logger.error(s"log publish failed: ${m}", e) + // failure + } + } + }) } } - def shutdown() = { - routees.map(_ ! Broadcast(PoisonPill)) - Thread.sleep(shutdownTime.length) - } + def shutdown() = producer.foreach(_.close) +} - def enqueues(msgs: Seq[KafkaMessage]) = { - msgs.foreach(enqueue) +object ExceptionHandler { + type Key = String + type Val = String + + def toKafkaMessage(topic: String, element: GraphElement, originalString: Option[String] = None) = { + KafkaMessage( + new ProducerRecord[Key, Val]( + topic, + element.queuePartitionKey, + originalString.getOrElse(element.toLogString()))) } - def enqueue(msg: KafkaMessage) = { - routees.map(_ ! msg) + def toKafkaMessage(topic: String, tsv: String) = { + KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) } + case class KafkaMessage(msg: ProducerRecord[Key, Val]) - def kafkaConfig(config: Config) = { - val props = new Properties(); + private def toKafkaProp(config: Config) = { + val props = new Properties() /* all default configuration for new producer */ val brokers = if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list") else "localhost" + props.put("bootstrap.servers", brokers) props.put("acks", "1") props.put("buffer.memory", "33554432") @@ -103,76 +104,8 @@ object ExceptionHandler { props.put("block.on.buffer.full", "false") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - props - } - - type Key = String - type Val = String - - def toKafkaMessage(topic: String = failTopic, element: GraphElement, originalString: Option[String] = None) = { - KafkaMessage(new ProducerRecord[Key, Val](topic, element.queuePartitionKey, - originalString.getOrElse(element.toLogString()))) - } - - case class KafkaMessage(msg: ProducerRecord[Key, Val]) - - case class Message(topic: String, msg: String) - - case class BufferedKafkaMessage(msgs: Seq[ProducerRecord[Key, Val]], bufferSize: Int) - - case class BufferedMessage(topic: String, bufferedMsgs: String, bufferSize: Int) - case object FlushBuffer - - case class UpdateHealth(isHealty: Boolean) - - case object ShowMetrics - -} - -class KafkaAggregatorActor(kafkaProducer: Producer[String, String]) extends Stash with ActorLogging { - - import ExceptionHandler._ - - val failedCount = new AtomicLong(0L) - val successCount = new AtomicLong(0L) - val stashCount = new AtomicLong(0L) - - implicit val ex = context.system.dispatcher - - context.system.scheduler.schedule(0 millis, 10 seconds) { - self ! ShowMetrics + props } - override def receive = { - case ShowMetrics => - log.info(s"[Stats]: failed[${failedCount.get}], stashed[${stashCount.get}], success[${successCount.get}]") - - case m: KafkaMessage => - val replayTo = self - try { - kafkaProducer.send(m.msg, new Callback() { - override def onCompletion(meta: RecordMetadata, e: Exception) = { - if (e == null) { - // success - successCount.incrementAndGet() - unstashAll() - stashCount.set(0L) - } else { - // failure - log.error(s"onCompletion: $e", e) - failedCount.incrementAndGet() - replayTo ! m - } - } - }) - } catch { - case e@(_: org.apache.kafka.clients.producer.BufferExhaustedException | _: Throwable) => - log.error(s"$e", e) - log.info(s"stash") - stash() - stashCount.incrementAndGet() - } - - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 70e47a7..341f405 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -57,6 +57,9 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { /** retry scheduler */ val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor() + /** handle mutate failed */ + val exceptionHandler = new ExceptionHandler(config) + val failTopic = s"mutateFailed_${config.getString("phase")}" /** @@ -247,7 +250,9 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { * this method need to be called when client shutdown. this is responsible to cleanUp the resources * such as client into storage. */ - def flush(): Unit + def flush(): Unit = { + exceptionHandler.shutdown() + } /** * create table on storage. @@ -339,6 +344,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { strong ++ weak } } + def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { val grouped = _edges.groupBy { edge => (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq @@ -429,7 +435,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") val kafkaMessage = ExceptionHandler.toKafkaMessage(failTopic, element = edge) - ExceptionHandler.enqueue(kafkaMessage) + exceptionHandler.enqueue(kafkaMessage) } Future.successful(false) @@ -946,12 +952,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { label <- labels } yield { val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t") - val topic = ExceptionHandler.failTopic - val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) - kafkaMsg + ExceptionHandler.toKafkaMessage(failTopic, tsv) } - ExceptionHandler.enqueues(kafkaMessages) + kafkaMessages.foreach(exceptionHandler.enqueue) } val requestTs = ts http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 783fd8a..7391259 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -365,6 +365,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte override def flush(): Unit = clients.foreach { client => + super.flush() val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS) Await.result(client.flush().toFuture, timeout) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala index 570c23c..30a5ee4 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala @@ -40,11 +40,9 @@ object Global extends WithFilters(new GzipFilter()) { var storageManagement: Management = _ var s2parser: RequestParser = _ var s2rest: RestHandler = _ + var wallLogHandler: ExceptionHandler = _ - // Application entry point - override def onStart(app: Application) { - ApplicationController.isHealthy = false - + def startup() = { val numOfThread = Runtime.getRuntime.availableProcessors() val threadPool = Executors.newFixedThreadPool(numOfThread) val ec = ExecutionContext.fromExecutor(threadPool) @@ -57,30 +55,38 @@ object Global extends WithFilters(new GzipFilter()) { s2parser = new RequestParser(s2graph.config) // merged config s2rest = new RestHandler(s2graph)(ec) - QueueActor.init(s2graph) + logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") + + config + } - if (Config.IS_WRITE_SERVER) { - ExceptionHandler.apply(config) - } + def shutdown() = { + s2graph.shutdown() + } + + // Application entry point + override def onStart(app: Application) { + ApplicationController.isHealthy = false + + val config = startup() + wallLogHandler = new ExceptionHandler(config) + + QueueActor.init(s2graph, wallLogHandler) val defaultHealthOn = Config.conf.getBoolean("app.health.on").getOrElse(true) ApplicationController.deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get ApplicationController.isHealthy = defaultHealthOn - logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") } override def onStop(app: Application) { + wallLogHandler.shutdown() QueueActor.shutdown() - if (Config.IS_WRITE_SERVER) { - ExceptionHandler.shutdown() - } - /* * shutdown hbase client for flush buffers. */ - s2graph.shutdown() + shutdown() } override def onError(request: RequestHeader, ex: Throwable): Future[Result] = { @@ -98,3 +104,4 @@ object Global extends WithFilters(new GzipFilter()) { Future.successful(Results.BadRequest(error)) } } + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala index 89931dc..d46d8d2 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala @@ -46,8 +46,8 @@ object QueueActor { var router: ActorRef = _ // Akka.system.actorOf(props(), name = "queueActor") - def init(s2: Graph) = { - router = Akka.system.actorOf(props(s2)) + def init(s2: Graph, walLogHandler: ExceptionHandler) = { + router = Akka.system.actorOf(props(s2, walLogHandler)) } def shutdown() = { @@ -56,10 +56,10 @@ object QueueActor { Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2) } - def props(s2: Graph): Props = Props(classOf[QueueActor], s2) + def props(s2: Graph, walLogHandler: ExceptionHandler): Props = Props(classOf[QueueActor], s2, walLogHandler) } -class QueueActor(s2: Graph) extends Actor with ActorLogging { +class QueueActor(s2: Graph, walLogHandler: ExceptionHandler) extends Actor with ActorLogging { import Protocol._ @@ -79,7 +79,7 @@ class QueueActor(s2: Graph) extends Actor with ActorLogging { case element: GraphElement => if (queueSize > maxQueueSize) { - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, element, None)) + walLogHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, element, None)) } else { queueSize += 1L queue.enqueue(element) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala index 707b98c..b328c85 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala @@ -23,6 +23,7 @@ import akka.util.ByteString import org.apache.s2graph.core.GraphExceptions.BadQueryException import org.apache.s2graph.core.PostProcess import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.rest.play.config.Config import play.api.http.HttpEntity import play.api.libs.iteratee.Enumerator import play.api.libs.json.{JsString, JsValue} @@ -33,6 +34,7 @@ import scala.concurrent.{ExecutionContext, Future} object ApplicationController extends Controller { var isHealthy = true + var isWriteFallbackHealthy= true var deployInfo = "" val applicationJsonHeader = "application/json" @@ -65,6 +67,13 @@ object ApplicationController extends Controller { else NotFound } + def toKafkaTopic(isAsync: Boolean) = { + if (!isWriteFallbackHealthy) Config.KAFKA_FAIL_TOPIC + else { + if (isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC + } + } + def jsonResponse(json: JsValue, headers: (String, String)*) = if (ApplicationController.isHealthy) { Ok(json).as(applicationJsonHeader).withHeaders(headers: _*) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala index 96f7e60..53f3fce 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala @@ -47,6 +47,7 @@ object CounterController extends Controller { val config = Play.current.configuration.underlying val s2config = new S2CounterConfig(config) + private lazy val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler private val exactCounterMap = Map( counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)), counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config)) @@ -739,7 +740,7 @@ object CounterController extends Controller { // produce to kafka // hash partitioner by key - ExceptionHandler.enqueue(KafkaMessage(new Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg))) + walLogHandler.enqueue(KafkaMessage(new Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg))) Ok(Json.toJson( Map( http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index ec8324c..5b2ef97 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -19,7 +19,6 @@ package org.apache.s2graph.rest.play.controllers -import org.apache.kafka.clients.producer.ProducerRecord import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.Label import org.apache.s2graph.core.rest.RequestParser @@ -40,6 +39,7 @@ object EdgeController extends Controller { private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser + private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler private def jsToStr(js: JsValue): String = js match { case JsString(s) => s @@ -69,10 +69,9 @@ object EdgeController extends Controller { val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation) for ((edge, orgJs) <- edges.zip(jsOrgs)) { - if (edge.isAsync) - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, Option(toTsv(orgJs, operation)))) - else - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, edge, Option(toTsv(orgJs, operation)))) + val kafkaTopic = toKafkaTopic(edge.isAsync) + val kafkaMessage = ExceptionHandler.toKafkaMessage(kafkaTopic, edge, Option(toTsv(orgJs, operation))) + walLogHandler.enqueue(kafkaMessage) } val edgesToStore = edges.filterNot(e => e.isAsync) @@ -108,11 +107,8 @@ object EdgeController extends Controller { case v: Vertex => vertexCnt += 1 case e: Edge => edgeCnt += 1 } - if (element.isAsync) { - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, Some(str))) - } else { - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, element, Some(str))) - } + val kafkaTopic = toKafkaTopic(element.isAsync) + walLogHandler.enqueue(toKafkaMessage(kafkaTopic, element, Some(str))) element } @@ -208,11 +204,10 @@ object EdgeController extends Controller { if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC } - val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) - kafkaMsg + ExceptionHandler.toKafkaMessage(topic, tsv) } - ExceptionHandler.enqueues(kafkaMessages) + kafkaMessages.foreach(walLogHandler.enqueue) } def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], ts: Long, vertices: Seq[Vertex]) = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala index 72f67c4..1a037ae 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala @@ -37,6 +37,7 @@ object PublishController extends Controller { */ val serviceNotExistException = new RuntimeException(s"service is not created in s2graph. create service first.") + private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler // private def toService(topic: String): String = { // Service.findByName(topic).map(service => s"${service.serviceName}-${Config.PHASE}").getOrElse(throw serviceNotExistException) // } @@ -48,7 +49,7 @@ object PublishController extends Controller { val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, str) // val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, s"$str") // logger.debug(s"$kafkaTopic, $str") - ExceptionHandler.enqueue(KafkaMessage(keyedMessage)) + walLogHandler.enqueue(KafkaMessage(keyedMessage)) }) Future.successful( Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b5918b1/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala index 2b15b5f..0fdbe43 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala @@ -32,6 +32,7 @@ import scala.concurrent.Future object VertexController extends Controller { private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser + private val walLogHandler: ExceptionHandler = org.apache.s2graph.rest.play.Global.wallLogHandler import ApplicationController._ import ExceptionHandler._ @@ -44,10 +45,8 @@ object VertexController extends Controller { val vertices = requestParser.toVertices(jsValue, operation, serviceNameOpt, columnNameOpt) for (vertex <- vertices) { - if (vertex.isAsync) - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, None)) - else - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, vertex, None)) + val kafkaTopic = toKafkaTopic(vertex.isAsync) + walLogHandler.enqueue(toKafkaMessage(kafkaTopic, vertex, None)) } //FIXME:
