handle json parse exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e607fc9b Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e607fc9b Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e607fc9b Branch: refs/heads/feature/test_daewon Commit: e607fc9b704ae30d0040dabff62c28ae68150941 Parents: d358931 Author: Jaesang Kim <[email protected]> Authored: Wed Dec 23 11:24:17 2015 +0900 Committer: Jaesang Kim <[email protected]> Committed: Wed Dec 23 11:25:12 2015 +0900 ---------------------------------------------------------------------- .../counter/core/v2/RankingStorageGraph.scala | 41 ++++++++++++++++++++ .../s2/counter/core/CounterFunctions.scala | 12 ++++-- 2 files changed, 49 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e607fc9b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala index b0c0a41..b12efbd 100644 --- a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala +++ b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala @@ -207,6 +207,46 @@ class RankingStorageGraph(config: Config) extends RankingStorage { private def getEdges(key: RankingKey, duplicate: String="first"): Future[List[JsValue]] = { val labelName = counterModel.findById(key.policyId).get.action + labelPostfix +// val ids = (0 until BUCKET_SHARD_COUNT).map { idx => +// s"${makeBucketShardKey(idx, key)}" +// } +// +// val payload = Json.obj( +// "srcVertices" -> Json.arr( +// Json.obj( +// "serviceName" -> SERVICE_NAME, +// "columnName" -> BUCKET_COLUMN_NAME, +// "ids" -> ids +// ) +// ), +// "steps" -> Json.arr( +// Json.obj( +// "step" -> Json.arr( +// Json.obj( +// "label" -> labelName, +// "duplicate" -> duplicate, +// "direction" -> "out", +// "offset" -> 0, +// "limit" -> -1, +// "interval" -> Json.obj( +// "from" -> Json.obj( +// "time_unit" -> key.eq.tq.q.toString, +// "time_value" -> key.eq.tq.ts +// ), +// "to" -> Json.obj( +// "time_unit" -> key.eq.tq.q.toString, +// "time_value" -> key.eq.tq.ts +// ), +// "scoring" -> Json.obj( +// "score" -> 1 +// ) +// ) +// ) +// ) +// ) +// ) +// ) + val ids = { (0 until BUCKET_SHARD_COUNT).map { shardIdx => s""""${makeBucketShardKey(shardIdx, key)}"""" @@ -246,6 +286,7 @@ class RankingStorageGraph(config: Config) extends RankingStorage { log.debug(strJs) val payload = Json.parse(strJs) + wsClient.url(s"$s2graphUrl/graphs/getEdges").post(payload).map { resp => resp.status match { case HttpStatus.SC_OK => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e607fc9b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala index 32e3d0c..cc2e54b 100644 --- a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala +++ b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala @@ -128,6 +128,12 @@ object CounterFunctions extends Logging with WithKafka { itemRankingRdd.unpersist(false) } } + + private def parseLine(line: String): Option[TrxLog] = Try { + val js = Json.parse(line) + js.toString() + js.as[TrxLog] + }.toOption def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[TrxLog] = { rdd.mapPartitions { part => @@ -135,10 +141,8 @@ object CounterFunctions extends Logging with WithKafka { for { (k, v) <- part line <- GraphUtil.parseString(v) - trxLog = Json.parse(line).as[TrxLog] if trxLog.success - } yield { - trxLog - } + trxLog <- parseLine(line) if trxLog.success + } yield trxLog } }
