Repository: incubator-griffin Updated Branches: refs/heads/master 0b100a16c -> 60e23edc9
persist try Author: Lionel Liu <[email protected]> Closes #168 from bhlx3lyx7/docker. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/60e23edc Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/60e23edc Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/60e23edc Branch: refs/heads/master Commit: 60e23edc96656f7382b74956bfb057e3eee9dda6 Parents: 0b100a1 Author: Lionel Liu <[email protected]> Authored: Fri Nov 3 18:01:22 2017 +0800 Committer: Lionel Liu <[email protected]> Committed: Fri Nov 3 18:01:22 2017 +0800 ---------------------------------------------------------------------- .../griffin/measure/persist/HdfsPersist.scala | 12 +++++--- .../griffin/measure/persist/MultiPersists.scala | 30 ++++++++++++++++++-- 2 files changed, 35 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60e23edc/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala index 431fe10..61d0cde 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala @@ -184,8 +184,12 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: // } private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = { - val recStr = records.mkString("\n") - HdfsUtil.writeContent(hdfsPath, recStr) + try { + val recStr = records.mkString("\n") + HdfsUtil.writeContent(hdfsPath, recStr) + } catch { + case e: Throwable => error(e.getMessage) + } } def log(rt: Long, msg: String): Unit = { @@ -276,9 +280,9 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: // } def persistMetrics(metrics: Map[String, Any]): Unit = { - val json = JsonUtil.toJson(metrics) try { - info(s"${json}") + val json = JsonUtil.toJson(metrics) + println(s"hdfs persist metrics: ${json}") persistRecords(MetricsFile, json :: Nil) } catch { case e: Throwable => error(e.getMessage) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60e23edc/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala index 0b7c98c..d698bb0 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala @@ -48,11 +48,35 @@ case class MultiPersists(persists: Iterable[Persist]) extends Persist { // def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) } // def matchRecords(records: RDD[String]): Unit = { persists.foreach(_.matchRecords(records)) } - def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) } + def log(rt: Long, msg: String): Unit = { + persists.foreach { persist => + try { + persist.log(rt, msg) + } catch { + case e: Throwable => error(s"log error: ${e.getMessage}") + } + } + } // def persistRecords(df: DataFrame, name: String): Unit = { persists.foreach(_.persistRecords(df, name)) } - def persistRecords(records: Iterable[String], name: String): Unit = { persists.foreach(_.persistRecords(records, name)) } + def persistRecords(records: Iterable[String], name: String): Unit = { + persists.foreach { persist => + try { + persist.persistRecords(records, name) + } catch { + case e: Throwable => error(s"persist records error: ${e.getMessage}") + } + } + } // def persistMetrics(metrics: Seq[String], name: String): Unit = { persists.foreach(_.persistMetrics(metrics, name)) } - def persistMetrics(metrics: Map[String, Any]): Unit = { persists.foreach(_.persistMetrics(metrics)) } + def persistMetrics(metrics: Map[String, Any]): Unit = { + persists.foreach { persist => + try { + persist.persistMetrics(metrics) + } catch { + case e: Throwable => error(s"persist metrics error: ${e.getMessage}") + } + } + } }
