This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 557e45c  [GRIFFIN-272] Abort the application if data source fail to 
load data.
557e45c is described below

commit 557e45cdbf8c2ab11c278a18eaf5fde8b686e77e
Author: wankunde <[email protected]>
AuthorDate: Mon Jul 29 21:49:35 2019 +0800

    [GRIFFIN-272] Abort the application if data source fail to load data.
    
    If the data source failed to load data, I think the current dq app should 
not continue to run.
    The rest compute steps is unnecessary.
    
    Author: wankunde <[email protected]>
    
    Closes #517 from wankunde/datasource.
---
 .../griffin/measure/datasource/DataSource.scala    | 22 ++++++++++++++--------
 .../connector/batch/AvroBatchDataConnector.scala   |  7 ++-----
 .../connector/batch/HiveBatchDataConnector.scala   |  6 +-----
 .../batch/TextDirBatchDataConnector.scala          |  7 ++-----
 4 files changed, 19 insertions(+), 23 deletions(-)

diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index f2cd0ec..fd94e9d 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -48,15 +48,21 @@ case class DataSource(name: String,
 
   def loadData(context: DQContext): TimeRange = {
     info(s"load data [${name}]")
-    val timestamp = context.contextId.timestamp
-    val (dfOpt, timeRange) = data(timestamp)
-    dfOpt match {
-      case Some(df) =>
-        context.runTimeTableRegister.registerTable(name, df)
-      case None =>
-        warn(s"load data source [${name}] fails")
+    try {
+      val timestamp = context.contextId.timestamp
+      val (dfOpt, timeRange) = data(timestamp)
+      dfOpt match {
+        case Some(df) =>
+          context.runTimeTableRegister.registerTable(name, df)
+        case None =>
+          warn(s"Data source [${name}] is null!")
+      }
+      timeRange
+    } catch {
+      case e =>
+        error(s"load data source [${name}] fails")
+        throw e
     }
-    timeRange
   }
 
   private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index 09c96d5..457eaa9 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -53,15 +53,12 @@ case class AvroBatchDataConnector(@transient sparkSession: 
SparkSession,
   }
 
   def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
+    assert(fileExist(), s"Avro file ${concreteFileFullPath} is not exists!")
+    val dfOpt = {
       val df = 
sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
       val dfOpt = Some(df)
       val preDfOpt = preProcess(dfOpt, ms)
       preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"load avro file ${concreteFileFullPath} fails", e)
-        None
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
index 91ab07d..a7926b2 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -47,17 +47,13 @@ case class HiveBatchDataConnector(@transient sparkSession: 
SparkSession,
   val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
 
   def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
+    val dfOpt = {
       val dtSql = dataSql
       info(dtSql)
       val df = sparkSession.sql(dtSql)
       val dfOpt = Some(df)
       val preDfOpt = preProcess(dfOpt, ms)
       preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", 
e)
-        None
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index e21335e..85b4774 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -53,7 +53,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: 
SparkSession,
   }
 
   def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
+    assert(dirExist(), s"Text dir ${dirPath} is not exists!")
+    val dfOpt = {
       val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
       // touch done file for read dirs
       dataDirs.foreach(dir => touchDone(dir))
@@ -68,10 +69,6 @@ case class TextDirBatchDataConnector(@transient 
sparkSession: SparkSession,
       } else {
         None
       }
-    } catch {
-      case e: Throwable =>
-        error(s"load text dir ${dirPath} fails: ${e.getMessage}", e)
-        None
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))

Reply via email to