This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 557e45c [GRIFFIN-272] Abort the application if data source fail to
load data.
557e45c is described below
commit 557e45cdbf8c2ab11c278a18eaf5fde8b686e77e
Author: wankunde <[email protected]>
AuthorDate: Mon Jul 29 21:49:35 2019 +0800
[GRIFFIN-272] Abort the application if data source fail to load data.
If the data source failed to load data, I think the current dq app should
not continue to run.
The rest compute steps is unnecessary.
Author: wankunde <[email protected]>
Closes #517 from wankunde/datasource.
---
.../griffin/measure/datasource/DataSource.scala | 22 ++++++++++++++--------
.../connector/batch/AvroBatchDataConnector.scala | 7 ++-----
.../connector/batch/HiveBatchDataConnector.scala | 6 +-----
.../batch/TextDirBatchDataConnector.scala | 7 ++-----
4 files changed, 19 insertions(+), 23 deletions(-)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index f2cd0ec..fd94e9d 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -48,15 +48,21 @@ case class DataSource(name: String,
def loadData(context: DQContext): TimeRange = {
info(s"load data [${name}]")
- val timestamp = context.contextId.timestamp
- val (dfOpt, timeRange) = data(timestamp)
- dfOpt match {
- case Some(df) =>
- context.runTimeTableRegister.registerTable(name, df)
- case None =>
- warn(s"load data source [${name}] fails")
+ try {
+ val timestamp = context.contextId.timestamp
+ val (dfOpt, timeRange) = data(timestamp)
+ dfOpt match {
+ case Some(df) =>
+ context.runTimeTableRegister.registerTable(name, df)
+ case None =>
+ warn(s"Data source [${name}] is null!")
+ }
+ timeRange
+ } catch {
+ case e =>
+ error(s"load data source [${name}] fails")
+ throw e
}
- timeRange
}
private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index 09c96d5..457eaa9 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -53,15 +53,12 @@ case class AvroBatchDataConnector(@transient sparkSession:
SparkSession,
}
def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
+ assert(fileExist(), s"Avro file ${concreteFileFullPath} is not exists!")
+ val dfOpt = {
val df =
sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load avro file ${concreteFileFullPath} fails", e)
- None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
index 91ab07d..a7926b2 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -47,17 +47,13 @@ case class HiveBatchDataConnector(@transient sparkSession:
SparkSession,
val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
+ val dfOpt = {
val dtSql = dataSql
info(dtSql)
val df = sparkSession.sql(dtSql)
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load hive table ${concreteTableName} fails: ${e.getMessage}",
e)
- None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index e21335e..85b4774 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -53,7 +53,8 @@ case class TextDirBatchDataConnector(@transient sparkSession:
SparkSession,
}
def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
+ assert(dirExist(), s"Text dir ${dirPath} is not exists!")
+ val dfOpt = {
val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
// touch done file for read dirs
dataDirs.foreach(dir => touchDone(dir))
@@ -68,10 +69,6 @@ case class TextDirBatchDataConnector(@transient
sparkSession: SparkSession,
} else {
None
}
- } catch {
- case e: Throwable =>
- error(s"load text dir ${dirPath} fails: ${e.getMessage}", e)
- None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))