This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 3b77ccc [GRIFFIN-316] Fix job exception handling
3b77ccc is described below
commit 3b77ccc018655b64ea12af5807f18f19dc642a17
Author: Yu <[email protected]>
AuthorDate: Wed Mar 11 23:15:19 2020 +0800
[GRIFFIN-316] Fix job exception handling
**What changes were proposed in this pull request?**
Currently we are using Try instance to represent the results of a DQ job,
whether succeeded or failed. But as we are only wrapping the Boolean result by
applying "Try" at the most outside level, the underlying failure would not be
able to caught and it would always return "Success" even if exception got.
This is to modify all the underlying execute/doExecute methods of a DQ job,
by handling exception with "Try" instances so that it could be passed properly
to users when things get wrong.
**Does this PR introduce any user-facing change?**
No.
**How was this patch tested?**
Griffin test suite.
Author: Yu <[email protected]>
Closes #562 from PnPie/exception_catch.
---
.../org/apache/griffin/measure/job/DQJob.scala | 16 +++++---
.../griffin/measure/launch/batch/BatchDQApp.scala | 2 +-
.../org/apache/griffin/measure/step/DQStep.scala | 4 +-
.../apache/griffin/measure/step/SeqDQStep.scala | 13 ++++++-
.../griffin/measure/step/read/ReadStep.scala | 3 +-
.../step/transform/DataFrameOpsTransformStep.scala | 18 ++++-----
.../step/transform/SparkSqlTransformStep.scala | 18 ++++-----
.../measure/step/transform/TransformStep.scala | 42 +++++++++++++--------
.../step/write/DataSourceUpdateWriteStep.scala | 3 +-
.../measure/step/write/MetricFlushStep.scala | 4 +-
.../measure/step/write/MetricWriteStep.scala | 4 +-
.../measure/step/write/RecordWriteStep.scala | 3 +-
.../_profiling-batch-griffindsl_malformed.json | 43 ++++++++++++++++++++++
.../griffin/measure/job/BatchDQAppTest.scala | 17 +++++++++
.../org/apache/griffin/measure/job/DQAppTest.scala | 7 +++-
.../griffin/measure/step/TransformStepTest.scala | 5 ++-
16 files changed, 148 insertions(+), 54 deletions(-)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
index 4b19cd6..81e0867 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
@@ -17,16 +17,22 @@
package org.apache.griffin.measure.job
+import scala.util.{Failure, Success, Try}
+
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
case class DQJob(dqSteps: Seq[DQStep]) extends Serializable {
- /**
- * @return execution success
- */
- def execute(context: DQContext): Boolean = {
- dqSteps.forall(dqStep => dqStep.execute(context))
+ def execute(context: DQContext): Try[Boolean] = {
+ dqSteps
+ .map(_.execute(context))
+ .foldLeft(Try(true)) { (ret, stepResult) =>
+ (ret, stepResult) match {
+ case (Success(_), nextResult) => nextResult
+ case (Failure(_), _) => ret
+ }
+ }
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index b23dd93..dc1fb52 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -59,7 +59,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
GriffinUDFAgent.register(sparkSession)
}
- def run: Try[Boolean] = Try {
+ def run: Try[Boolean] = {
// start time
val startTime = new Date().getTime
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
index a6eb95a..0b4527c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
@@ -17,6 +17,8 @@
package org.apache.griffin.measure.step
+import scala.util.Try
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.context.DQContext
@@ -27,7 +29,7 @@ trait DQStep extends Loggable {
/**
* @return execution success
*/
- def execute(context: DQContext): Boolean
+ def execute(context: DQContext): Try[Boolean]
def getNames: Seq[String] = name :: Nil
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
index 0eaea64..5ee5740 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
@@ -17,6 +17,8 @@
package org.apache.griffin.measure.step
+import scala.util.{Failure, Success, Try}
+
import org.apache.griffin.measure.context.DQContext
/**
@@ -31,8 +33,15 @@ case class SeqDQStep(dqSteps: Seq[DQStep]) extends DQStep {
/**
* @return execution success
*/
- def execute(context: DQContext): Boolean = {
- dqSteps.forall(dqStep => dqStep.execute(context))
+ def execute(context: DQContext): Try[Boolean] = {
+ dqSteps
+ .map(_.execute(context))
+ .foldLeft(Try(true))((ret, stepResult) => {
+ (ret, stepResult) match {
+ case (Success(_), nextResult) => nextResult
+ case (Failure(_), _) => ret
+ }
+ })
}
override def getNames: Seq[String] = {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
index 11582d8..d358189 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
@@ -18,6 +18,7 @@
package org.apache.griffin.measure.step.read
import org.apache.spark.sql._
+import scala.util.Try
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
@@ -28,7 +29,7 @@ trait ReadStep extends DQStep {
val cache: Boolean
- def execute(context: DQContext): Boolean = {
+ def execute(context: DQContext): Try[Boolean] = Try {
info(s"read data source [$name]")
read(context) match {
case Some(df) =>
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 1b3fb33..1d06146 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -17,6 +17,8 @@
package org.apache.griffin.measure.step.transform
+import scala.util.Try
+
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.write.WriteStep
@@ -32,14 +34,13 @@ case class DataFrameOpsTransformStep[T <: WriteStep](
cache: Boolean = false)
extends TransformStep {
- def doExecute(context: DQContext): Boolean = {
- val sparkSession = context.sparkSession
- try {
+ def doExecute(context: DQContext): Try[Boolean] =
+ Try {
+ val sparkSession = context.sparkSession
val df = rule match {
case DataFrameOps._fromJson => DataFrameOps.fromJson(sparkSession,
inputDfName, details)
case DataFrameOps._accuracy =>
DataFrameOps.accuracy(sparkSession, inputDfName, context.contextId,
details)
-
case DataFrameOps._clear => DataFrameOps.clear(sparkSession,
inputDfName, details)
case _ => throw new Exception(s"df opr [ $rule ] not supported")
}
@@ -47,13 +48,8 @@ case class DataFrameOpsTransformStep[T <: WriteStep](
context.runTimeTableRegister.registerTable(name, df)
writeStepOpt match {
case Some(writeStep) => writeStep.execute(context)
- case None => true
+ case None => Try(true)
}
- } catch {
- case e: Throwable =>
- error(s"run data frame ops [ $rule ] error: ${e.getMessage}", e)
- false
- }
- }
+ }.flatten
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index fc0306f..8b6e203 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -17,6 +17,8 @@
package org.apache.griffin.measure.step.transform
+import scala.util.Try
+
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.write.WriteStep
@@ -30,21 +32,17 @@ case class SparkSqlTransformStep[T <: WriteStep](
writeStepOpt: Option[T] = None,
cache: Boolean = false)
extends TransformStep {
- def doExecute(context: DQContext): Boolean = {
- val sparkSession = context.sparkSession
- try {
+
+ def doExecute(context: DQContext): Try[Boolean] =
+ Try {
+ val sparkSession = context.sparkSession
val df = sparkSession.sql(rule)
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
writeStepOpt match {
case Some(writeStep) => writeStep.execute(context)
- case None => true
+ case None => Try(true)
}
- } catch {
- case e: Throwable =>
- error(s"run spark sql [ $rule ] error: ${e.getMessage}", e)
- false
- }
- }
+ }.flatten
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
index e2dfdd1..90c2cb0 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
@@ -18,10 +18,10 @@
package org.apache.griffin.measure.step.transform
import scala.collection.mutable
-import scala.collection.mutable.HashSet
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.{DQStep, DQStepStatus}
@@ -40,41 +40,51 @@ trait TransformStep extends DQStep {
val parentSteps = new mutable.HashSet[TransformStep]
- def doExecute(context: DQContext): Boolean
+ def doExecute(context: DQContext): Try[Boolean]
+
+ def execute(context: DQContext): Try[Boolean] = {
- def execute(context: DQContext): Boolean = {
val threadName = Thread.currentThread().getName
info(threadName + " begin transform step : \n" + debugString())
+
// Submit parents Steps
val parentStepFutures = parentSteps.filter(checkAndUpdateStatus).map {
parentStep =>
Future {
val result = parentStep.execute(context)
parentStep.synchronized {
- if (result) {
- parentStep.status = COMPLETE
- } else {
- parentStep.status = FAILED
+ result match {
+ case Success(_) => parentStep.status = COMPLETE
+ case Failure(_) => parentStep.status = FAILED
}
}
+ result
}(TransformStep.transformStepContext)
}
- ThreadUtils.awaitResult(
+
+ val parentsResultSet = ThreadUtils.awaitResult(
Future.sequence(parentStepFutures)(implicitly,
TransformStep.transformStepContext),
Duration.Inf)
+ val parentsResult = parentsResultSet.foldLeft(Try(true)) { (ret, step) =>
+ (ret, step) match {
+ case (Success(_), nextResult) => nextResult
+ case (Failure(_), _) => ret
+ }
+ }
+
parentSteps.foreach(step => {
while (step.status == RUNNING) {
Thread.sleep(1000L)
}
})
- val prepared = parentSteps.forall(step => step.status == COMPLETE)
- if (prepared) {
- val res = doExecute(context)
- info(threadName + " end transform step : \n" + debugString())
- res
- } else {
- error("Parent transform step failed!")
- false
+
+ parentsResult match {
+ case Success(_) =>
+ info(threadName + " end transform step : \n" + debugString())
+ doExecute(context)
+ case Failure(_) =>
+ error("Parent transform step failed!")
+ parentsResult
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index c1af659..3faed1b 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -19,6 +19,7 @@ package org.apache.griffin.measure.step.write
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.DataFrame
+import scala.util.Try
import org.apache.griffin.measure.context.DQContext
@@ -30,7 +31,7 @@ case class DataSourceUpdateWriteStep(dsName: String,
inputName: String) extends
val name: String = ""
val writeTimestampOpt: Option[Long] = None
- def execute(context: DQContext): Boolean = {
+ def execute(context: DQContext): Try[Boolean] = Try {
getDataSourceCacheUpdateDf(context) match {
case Some(df) =>
context.dataSources
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 40754e2..ed4bc54 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -17,6 +17,8 @@
package org.apache.griffin.measure.step.write
+import scala.util.Try
+
import org.apache.griffin.measure.context.DQContext
/**
@@ -28,7 +30,7 @@ case class MetricFlushStep() extends WriteStep {
val inputName: String = ""
val writeTimestampOpt: Option[Long] = None
- def execute(context: DQContext): Boolean = {
+ def execute(context: DQContext): Try[Boolean] = Try {
context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
val (t, metric) = pair
val pr = try {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index cdf337b..8f7f0c5 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -17,6 +17,8 @@
package org.apache.griffin.measure.step.write
+import scala.util.Try
+
import org.apache.griffin.measure.configuration.enums.{SimpleMode,
TimestampMode}
import org.apache.griffin.measure.configuration.enums.FlattenType.{
ArrayFlattenType,
@@ -42,7 +44,7 @@ case class MetricWriteStep(
val emptyMetricMap: Map[Long, Map[String, Any]] = Map[Long, Map[String,
Any]]()
val emptyMap: Map[String, Any] = Map[String, Any]()
- def execute(context: DQContext): Boolean = {
+ def execute(context: DQContext): Try[Boolean] = Try {
val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
// get metric list from data frame
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 01db3fe..975bdc5 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -19,6 +19,7 @@ package org.apache.griffin.measure.step.write
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
+import scala.util.Try
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
@@ -35,7 +36,7 @@ case class RecordWriteStep(
writeTimestampOpt: Option[Long] = None)
extends WriteStep {
- def execute(context: DQContext): Boolean = {
+ def execute(context: DQContext): Try[Boolean] = Try {
val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
val writeMode = writeTimestampOpt.map(_ =>
SimpleMode).getOrElse(context.writeMode)
diff --git
a/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
b/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
new file mode 100644
index 0000000..e8c72f9
--- /dev/null
+++ b/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
@@ -0,0 +1,43 @@
+{
+ "name": "prof_batch",
+ "process.type": "batch",
+ "timestamp": 123456,
+ "data.sources": [
+ {
+ "name": "source",
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "dataframe.name": "this_table",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "spark-sql",
+ "rule": "select * from this_table where user_id < 10014"
+ }
+ ]
+ }
+ }
+ ],
+ "evaluate.rule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "profiling",
+ "out.dataframe.name": "prof",
+ "rule": "abc",
+ "out":[
+ {
+ "type": "metric",
+ "name": "prof",
+ "flatten": "array"
+ }
+ ]
+ }
+ ]
+ },
+
+ "sinks": ["CONSOLE"]
+}
\ No newline at end of file
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index 633974c..a95d76e 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -17,6 +17,8 @@
package org.apache.griffin.measure.job
+import org.apache.spark.sql.AnalysisException
+import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
import org.apache.griffin.measure.Application.readParamFile
@@ -69,6 +71,15 @@ class BatchDQAppTest extends DQAppTest {
dqContext.metricWrapper.metrics should equal(expectedMetrics)
}
+ def runAndCheckException[T <: AnyRef](implicit classTag: ClassTag[T]): Unit
= {
+ dqApp.run match {
+ case Success(_) =>
+ fail(
+ s"job ${dqApp.dqParam.getName} should not succeed, a
${classTag.toString} exception is expected.")
+ case Failure(ex) => assertThrows[T](throw ex)
+ }
+ }
+
"accuracy batch job" should "work" in {
dqApp = initApp("/_accuracy-batch-griffindsl.json")
val expectedMetrics = Map(
@@ -139,4 +150,10 @@ class BatchDQAppTest extends DQAppTest {
runAndCheckResult(expectedMetrics)
}
+
+ "batch job" should "fail with exception caught due to invalid rules" in {
+ dqApp = initApp("/_profiling-batch-griffindsl_malformed.json")
+
+ runAndCheckException[AnalysisException]
+ }
}
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index a05ade6..a557dda 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -43,7 +43,12 @@ class DQAppTest
var dqApp: DQApp = _
def getConfigFilePath(fileName: String): String = {
- getClass.getResource(fileName).getFile
+ try {
+ getClass.getResource(fileName).getFile
+ } catch {
+ case _: NullPointerException => throw new Exception(s"resource
[$fileName] not found")
+ case ex: Throwable => throw ex
+ }
}
def initApp(dqParamFile: String): DQApp = {
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index 5a227a0..834d8e0 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -18,6 +18,7 @@
package org.apache.griffin.measure.step
import org.scalatest._
+import scala.util.Try
import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
import
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
@@ -34,7 +35,7 @@ class TransformStepTest extends FlatSpec with Matchers with
SparkSuiteBase with
cache: Boolean = false)
extends TransformStep {
- def doExecute(context: DQContext): Boolean = {
+ def doExecute(context: DQContext): Try[Boolean] = Try {
val threadName = Thread.currentThread().getName
info(s"Step $name started with $threadName")
Thread.sleep(duration * 1000L)
@@ -77,6 +78,6 @@ class TransformStepTest extends FlatSpec with Matchers with
SparkSuiteBase with
step5.parentSteps += step4
val context = getDqContext()
- step5.execute(context) should be(true)
+ step5.execute(context).get should be(true)
}
}