This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b77ccc  [GRIFFIN-316] Fix job exception handling
3b77ccc is described below

commit 3b77ccc018655b64ea12af5807f18f19dc642a17
Author: Yu <[email protected]>
AuthorDate: Wed Mar 11 23:15:19 2020 +0800

    [GRIFFIN-316] Fix job exception handling
    
    **What changes were proposed in this pull request?**
    
    Currently we are using Try instance to represent the results of a DQ job, 
whether succeeded or failed. But as we are only wrapping the Boolean result by 
applying "Try" at the most outside level, the underlying failure would not be 
able to caught and it would always return "Success" even if exception got.
    
    This is to modify all the underlying execute/doExecute methods of a DQ job, 
by handling exception with "Try" instances so that it could be passed properly 
to users when things get wrong.
    
    **Does this PR introduce any user-facing change?**
    No.
    
    **How was this patch tested?**
    Griffin test suite.
    
    Author: Yu <[email protected]>
    
    Closes #562 from PnPie/exception_catch.
---
 .../org/apache/griffin/measure/job/DQJob.scala     | 16 +++++---
 .../griffin/measure/launch/batch/BatchDQApp.scala  |  2 +-
 .../org/apache/griffin/measure/step/DQStep.scala   |  4 +-
 .../apache/griffin/measure/step/SeqDQStep.scala    | 13 ++++++-
 .../griffin/measure/step/read/ReadStep.scala       |  3 +-
 .../step/transform/DataFrameOpsTransformStep.scala | 18 ++++-----
 .../step/transform/SparkSqlTransformStep.scala     | 18 ++++-----
 .../measure/step/transform/TransformStep.scala     | 42 +++++++++++++--------
 .../step/write/DataSourceUpdateWriteStep.scala     |  3 +-
 .../measure/step/write/MetricFlushStep.scala       |  4 +-
 .../measure/step/write/MetricWriteStep.scala       |  4 +-
 .../measure/step/write/RecordWriteStep.scala       |  3 +-
 .../_profiling-batch-griffindsl_malformed.json     | 43 ++++++++++++++++++++++
 .../griffin/measure/job/BatchDQAppTest.scala       | 17 +++++++++
 .../org/apache/griffin/measure/job/DQAppTest.scala |  7 +++-
 .../griffin/measure/step/TransformStepTest.scala   |  5 ++-
 16 files changed, 148 insertions(+), 54 deletions(-)

diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala 
b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
index 4b19cd6..81e0867 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
@@ -17,16 +17,22 @@
 
 package org.apache.griffin.measure.job
 
+import scala.util.{Failure, Success, Try}
+
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 
 case class DQJob(dqSteps: Seq[DQStep]) extends Serializable {
 
-  /**
-   * @return execution success
-   */
-  def execute(context: DQContext): Boolean = {
-    dqSteps.forall(dqStep => dqStep.execute(context))
+  def execute(context: DQContext): Try[Boolean] = {
+    dqSteps
+      .map(_.execute(context))
+      .foldLeft(Try(true)) { (ret, stepResult) =>
+        (ret, stepResult) match {
+          case (Success(_), nextResult) => nextResult
+          case (Failure(_), _) => ret
+        }
+      }
   }
 
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index b23dd93..dc1fb52 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -59,7 +59,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
     GriffinUDFAgent.register(sparkSession)
   }
 
-  def run: Try[Boolean] = Try {
+  def run: Try[Boolean] = {
     // start time
     val startTime = new Date().getTime
 
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
index a6eb95a..0b4527c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step
 
+import scala.util.Try
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.context.DQContext
 
@@ -27,7 +29,7 @@ trait DQStep extends Loggable {
   /**
    * @return execution success
    */
-  def execute(context: DQContext): Boolean
+  def execute(context: DQContext): Try[Boolean]
 
   def getNames: Seq[String] = name :: Nil
 
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
index 0eaea64..5ee5740 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step
 
+import scala.util.{Failure, Success, Try}
+
 import org.apache.griffin.measure.context.DQContext
 
 /**
@@ -31,8 +33,15 @@ case class SeqDQStep(dqSteps: Seq[DQStep]) extends DQStep {
   /**
    * @return execution success
    */
-  def execute(context: DQContext): Boolean = {
-    dqSteps.forall(dqStep => dqStep.execute(context))
+  def execute(context: DQContext): Try[Boolean] = {
+    dqSteps
+      .map(_.execute(context))
+      .foldLeft(Try(true))((ret, stepResult) => {
+        (ret, stepResult) match {
+          case (Success(_), nextResult) => nextResult
+          case (Failure(_), _) => ret
+        }
+      })
   }
 
   override def getNames: Seq[String] = {
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
index 11582d8..d358189 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
@@ -18,6 +18,7 @@
 package org.apache.griffin.measure.step.read
 
 import org.apache.spark.sql._
+import scala.util.Try
 
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
@@ -28,7 +29,7 @@ trait ReadStep extends DQStep {
 
   val cache: Boolean
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     info(s"read data source [$name]")
     read(context) match {
       case Some(df) =>
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 1b3fb33..1d06146 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step.transform
 
+import scala.util.Try
+
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.write.WriteStep
 
@@ -32,14 +34,13 @@ case class DataFrameOpsTransformStep[T <: WriteStep](
     cache: Boolean = false)
     extends TransformStep {
 
-  def doExecute(context: DQContext): Boolean = {
-    val sparkSession = context.sparkSession
-    try {
+  def doExecute(context: DQContext): Try[Boolean] =
+    Try {
+      val sparkSession = context.sparkSession
       val df = rule match {
         case DataFrameOps._fromJson => DataFrameOps.fromJson(sparkSession, 
inputDfName, details)
         case DataFrameOps._accuracy =>
           DataFrameOps.accuracy(sparkSession, inputDfName, context.contextId, 
details)
-
         case DataFrameOps._clear => DataFrameOps.clear(sparkSession, 
inputDfName, details)
         case _ => throw new Exception(s"df opr [ $rule ] not supported")
       }
@@ -47,13 +48,8 @@ case class DataFrameOpsTransformStep[T <: WriteStep](
       context.runTimeTableRegister.registerTable(name, df)
       writeStepOpt match {
         case Some(writeStep) => writeStep.execute(context)
-        case None => true
+        case None => Try(true)
       }
-    } catch {
-      case e: Throwable =>
-        error(s"run data frame ops [ $rule ] error: ${e.getMessage}", e)
-        false
-    }
-  }
+    }.flatten
 
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index fc0306f..8b6e203 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step.transform
 
+import scala.util.Try
+
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.write.WriteStep
 
@@ -30,21 +32,17 @@ case class SparkSqlTransformStep[T <: WriteStep](
     writeStepOpt: Option[T] = None,
     cache: Boolean = false)
     extends TransformStep {
-  def doExecute(context: DQContext): Boolean = {
-    val sparkSession = context.sparkSession
-    try {
+
+  def doExecute(context: DQContext): Try[Boolean] =
+    Try {
+      val sparkSession = context.sparkSession
       val df = sparkSession.sql(rule)
       if (cache) context.dataFrameCache.cacheDataFrame(name, df)
       context.runTimeTableRegister.registerTable(name, df)
       writeStepOpt match {
         case Some(writeStep) => writeStep.execute(context)
-        case None => true
+        case None => Try(true)
       }
-    } catch {
-      case e: Throwable =>
-        error(s"run spark sql [ $rule ] error: ${e.getMessage}", e)
-        false
-    }
-  }
+    }.flatten
 
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
index e2dfdd1..90c2cb0 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
@@ -18,10 +18,10 @@
 package org.apache.griffin.measure.step.transform
 
 import scala.collection.mutable
-import scala.collection.mutable.HashSet
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
 
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.{DQStep, DQStepStatus}
@@ -40,41 +40,51 @@ trait TransformStep extends DQStep {
 
   val parentSteps = new mutable.HashSet[TransformStep]
 
-  def doExecute(context: DQContext): Boolean
+  def doExecute(context: DQContext): Try[Boolean]
+
+  def execute(context: DQContext): Try[Boolean] = {
 
-  def execute(context: DQContext): Boolean = {
     val threadName = Thread.currentThread().getName
     info(threadName + " begin transform step : \n" + debugString())
+
     // Submit parents Steps
     val parentStepFutures = parentSteps.filter(checkAndUpdateStatus).map { 
parentStep =>
       Future {
         val result = parentStep.execute(context)
         parentStep.synchronized {
-          if (result) {
-            parentStep.status = COMPLETE
-          } else {
-            parentStep.status = FAILED
+          result match {
+            case Success(_) => parentStep.status = COMPLETE
+            case Failure(_) => parentStep.status = FAILED
           }
         }
+        result
       }(TransformStep.transformStepContext)
     }
-    ThreadUtils.awaitResult(
+
+    val parentsResultSet = ThreadUtils.awaitResult(
       Future.sequence(parentStepFutures)(implicitly, 
TransformStep.transformStepContext),
       Duration.Inf)
 
+    val parentsResult = parentsResultSet.foldLeft(Try(true)) { (ret, step) =>
+      (ret, step) match {
+        case (Success(_), nextResult) => nextResult
+        case (Failure(_), _) => ret
+      }
+    }
+
     parentSteps.foreach(step => {
       while (step.status == RUNNING) {
         Thread.sleep(1000L)
       }
     })
-    val prepared = parentSteps.forall(step => step.status == COMPLETE)
-    if (prepared) {
-      val res = doExecute(context)
-      info(threadName + " end transform step : \n" + debugString())
-      res
-    } else {
-      error("Parent transform step failed!")
-      false
+
+    parentsResult match {
+      case Success(_) =>
+        info(threadName + " end transform step : \n" + debugString())
+        doExecute(context)
+      case Failure(_) =>
+        error("Parent transform step failed!")
+        parentsResult
     }
   }
 
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index c1af659..3faed1b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -19,6 +19,7 @@ package org.apache.griffin.measure.step.write
 
 import org.apache.commons.lang.StringUtils
 import org.apache.spark.sql.DataFrame
+import scala.util.Try
 
 import org.apache.griffin.measure.context.DQContext
 
@@ -30,7 +31,7 @@ case class DataSourceUpdateWriteStep(dsName: String, 
inputName: String) extends
   val name: String = ""
   val writeTimestampOpt: Option[Long] = None
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     getDataSourceCacheUpdateDf(context) match {
       case Some(df) =>
         context.dataSources
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 40754e2..ed4bc54 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step.write
 
+import scala.util.Try
+
 import org.apache.griffin.measure.context.DQContext
 
 /**
@@ -28,7 +30,7 @@ case class MetricFlushStep() extends WriteStep {
   val inputName: String = ""
   val writeTimestampOpt: Option[Long] = None
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
       val (t, metric) = pair
       val pr = try {
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index cdf337b..8f7f0c5 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step.write
 
+import scala.util.Try
+
 import org.apache.griffin.measure.configuration.enums.{SimpleMode, 
TimestampMode}
 import org.apache.griffin.measure.configuration.enums.FlattenType.{
   ArrayFlattenType,
@@ -42,7 +44,7 @@ case class MetricWriteStep(
   val emptyMetricMap: Map[Long, Map[String, Any]] = Map[Long, Map[String, 
Any]]()
   val emptyMap: Map[String, Any] = Map[String, Any]()
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
 
     // get metric list from data frame
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 01db3fe..975bdc5 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -19,6 +19,7 @@ package org.apache.griffin.measure.step.write
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
+import scala.util.Try
 
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
@@ -35,7 +36,7 @@ case class RecordWriteStep(
     writeTimestampOpt: Option[Long] = None)
     extends WriteStep {
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
 
     val writeMode = writeTimestampOpt.map(_ => 
SimpleMode).getOrElse(context.writeMode)
diff --git 
a/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json 
b/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
new file mode 100644
index 0000000..e8c72f9
--- /dev/null
+++ b/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
@@ -0,0 +1,43 @@
+{
+  "name": "prof_batch",
+  "process.type": "batch",
+  "timestamp": 123456,
+  "data.sources": [
+    {
+      "name": "source",
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "dataframe.name": "this_table",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "spark-sql",
+            "rule": "select * from this_table where user_id < 10014"
+          }
+        ]
+      }
+    }
+  ],
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "out.dataframe.name": "prof",
+        "rule": "abc",
+        "out":[
+          {
+            "type": "metric",
+            "name": "prof",
+            "flatten": "array"
+          }
+        ]
+      }
+    ]
+  },
+
+  "sinks": ["CONSOLE"]
+}
\ No newline at end of file
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala 
b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index 633974c..a95d76e 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.job
 
+import org.apache.spark.sql.AnalysisException
+import scala.reflect.ClassTag
 import scala.util.{Failure, Success, Try}
 
 import org.apache.griffin.measure.Application.readParamFile
@@ -69,6 +71,15 @@ class BatchDQAppTest extends DQAppTest {
     dqContext.metricWrapper.metrics should equal(expectedMetrics)
   }
 
+  def runAndCheckException[T <: AnyRef](implicit classTag: ClassTag[T]): Unit 
= {
+    dqApp.run match {
+      case Success(_) =>
+        fail(
+          s"job ${dqApp.dqParam.getName} should not succeed, a 
${classTag.toString} exception is expected.")
+      case Failure(ex) => assertThrows[T](throw ex)
+    }
+  }
+
   "accuracy batch job" should "work" in {
     dqApp = initApp("/_accuracy-batch-griffindsl.json")
     val expectedMetrics = Map(
@@ -139,4 +150,10 @@ class BatchDQAppTest extends DQAppTest {
 
     runAndCheckResult(expectedMetrics)
   }
+
+  "batch job" should "fail with exception caught due to invalid rules" in {
+    dqApp = initApp("/_profiling-batch-griffindsl_malformed.json")
+
+    runAndCheckException[AnalysisException]
+  }
 }
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala 
b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index a05ade6..a557dda 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -43,7 +43,12 @@ class DQAppTest
   var dqApp: DQApp = _
 
   def getConfigFilePath(fileName: String): String = {
-    getClass.getResource(fileName).getFile
+    try {
+      getClass.getResource(fileName).getFile
+    } catch {
+      case _: NullPointerException => throw new Exception(s"resource 
[$fileName] not found")
+      case ex: Throwable => throw ex
+    }
   }
 
   def initApp(dqParamFile: String): DQApp = {
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index 5a227a0..834d8e0 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -18,6 +18,7 @@
 package org.apache.griffin.measure.step
 
 import org.scalatest._
+import scala.util.Try
 
 import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
 import 
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
@@ -34,7 +35,7 @@ class TransformStepTest extends FlatSpec with Matchers with 
SparkSuiteBase with
       cache: Boolean = false)
       extends TransformStep {
 
-    def doExecute(context: DQContext): Boolean = {
+    def doExecute(context: DQContext): Try[Boolean] = Try {
       val threadName = Thread.currentThread().getName
       info(s"Step $name started with $threadName")
       Thread.sleep(duration * 1000L)
@@ -77,6 +78,6 @@ class TransformStepTest extends FlatSpec with Matchers with 
SparkSuiteBase with
     step5.parentSteps += step4
 
     val context = getDqContext()
-    step5.execute(context) should be(true)
+    step5.execute(context).get should be(true)
   }
 }

Reply via email to