Repository: incubator-griffin
Updated Branches:
  refs/heads/master 0b100a16c -> 60e23edc9


persist try

Author: Lionel Liu <[email protected]>

Closes #168 from bhlx3lyx7/docker.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/60e23edc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/60e23edc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/60e23edc

Branch: refs/heads/master
Commit: 60e23edc96656f7382b74956bfb057e3eee9dda6
Parents: 0b100a1
Author: Lionel Liu <[email protected]>
Authored: Fri Nov 3 18:01:22 2017 +0800
Committer: Lionel Liu <[email protected]>
Committed: Fri Nov 3 18:01:22 2017 +0800

----------------------------------------------------------------------
 .../griffin/measure/persist/HdfsPersist.scala   | 12 +++++---
 .../griffin/measure/persist/MultiPersists.scala | 30 ++++++++++++++++++--
 2 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60e23edc/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
index 431fe10..61d0cde 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
@@ -184,8 +184,12 @@ case class HdfsPersist(config: Map[String, Any], 
metricName: String, timeStamp:
 //  }
 
   private def persistRecords(hdfsPath: String, records: Iterable[String]): 
Unit = {
-    val recStr = records.mkString("\n")
-    HdfsUtil.writeContent(hdfsPath, recStr)
+    try {
+      val recStr = records.mkString("\n")
+      HdfsUtil.writeContent(hdfsPath, recStr)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
   }
 
   def log(rt: Long, msg: String): Unit = {
@@ -276,9 +280,9 @@ case class HdfsPersist(config: Map[String, Any], 
metricName: String, timeStamp:
 //  }
 
   def persistMetrics(metrics: Map[String, Any]): Unit = {
-    val json = JsonUtil.toJson(metrics)
     try {
-      info(s"${json}")
+      val json = JsonUtil.toJson(metrics)
+      println(s"hdfs persist metrics: ${json}")
       persistRecords(MetricsFile, json :: Nil)
     } catch {
       case e: Throwable => error(e.getMessage)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/60e23edc/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
index 0b7c98c..d698bb0 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
@@ -48,11 +48,35 @@ case class MultiPersists(persists: Iterable[Persist]) 
extends Persist {
 //  def missRecords(records: RDD[String]): Unit = { 
persists.foreach(_.missRecords(records)) }
 //  def matchRecords(records: RDD[String]): Unit = { 
persists.foreach(_.matchRecords(records)) }
 
-  def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
+  def log(rt: Long, msg: String): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.log(rt, msg)
+      } catch {
+        case e: Throwable => error(s"log error: ${e.getMessage}")
+      }
+    }
+  }
 
 //  def persistRecords(df: DataFrame, name: String): Unit = { 
persists.foreach(_.persistRecords(df, name)) }
-  def persistRecords(records: Iterable[String], name: String): Unit = { 
persists.foreach(_.persistRecords(records, name)) }
+  def persistRecords(records: Iterable[String], name: String): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.persistRecords(records, name)
+      } catch {
+        case e: Throwable => error(s"persist records error: ${e.getMessage}")
+      }
+    }
+  }
 //  def persistMetrics(metrics: Seq[String], name: String): Unit = { 
persists.foreach(_.persistMetrics(metrics, name)) }
-  def persistMetrics(metrics: Map[String, Any]): Unit = { 
persists.foreach(_.persistMetrics(metrics)) }
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.persistMetrics(metrics)
+      } catch {
+        case e: Throwable => error(s"persist metrics error: ${e.getMessage}")
+      }
+    }
+  }
 
 }

Reply via email to