This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 077003a [GRIFFIN-283] Move sink steps into TransformStep
077003a is described below
commit 077003ad786d3388d16a713d9cc837dd7e8e3440
Author: wankunde <[email protected]>
AuthorDate: Tue Aug 27 07:42:13 2019 +0800
[GRIFFIN-283] Move sink steps into TransformStep
Treat sink steps as a part of a transform step, so we can keep focus on
transform step codes.
Also the sink steps and some other transform step could be executed
concurrently.
Author: wankunde <[email protected]>
Closes #526 from wankunde/sink2.
---
.../step/builder/DataFrameOpsDQStepBuilder.scala | 2 +-
.../step/builder/SparkSqlDQStepBuilder.scala | 2 +-
.../dsl/transform/AccuracyExpr2DQSteps.scala | 49 +++++-----
.../dsl/transform/CompletenessExpr2DQSteps.scala | 22 ++---
.../dsl/transform/DistinctnessExpr2DQSteps.scala | 104 ++++++++++++---------
.../step/builder/dsl/transform/Expr2DQSteps.scala | 4 +-
.../dsl/transform/ProfilingExpr2DQSteps.scala | 5 +-
.../dsl/transform/TimelinessExpr2DQSteps.scala | 47 +++++-----
.../dsl/transform/UniquenessExpr2DQSteps.scala | 34 ++++---
.../step/transform/DataFrameOpsTransformStep.scala | 9 +-
.../step/transform/SparkSqlTransformStep.scala | 18 ++--
.../griffin/measure/step/TransformStepTest.scala | 5 +-
12 files changed, 163 insertions(+), 138 deletions(-)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
index 796c797..743b05d 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
@@ -29,7 +29,7 @@ case class DataFrameOpsDQStepBuilder() extends
RuleParamStepBuilder {
val name = getStepName(ruleParam.getOutDfName())
val inputDfName = getStepName(ruleParam.getInDfName())
val transformStep = DataFrameOpsTransformStep(
- name, inputDfName, ruleParam.getRule, ruleParam.getDetails,
ruleParam.getCache)
+ name, inputDfName, ruleParam.getRule, ruleParam.getDetails, None,
ruleParam.getCache)
transformStep +: buildDirectWriteSteps(ruleParam)
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
index b5dfd0c..0fdf20a 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
@@ -28,7 +28,7 @@ case class SparkSqlDQStepBuilder() extends
RuleParamStepBuilder {
def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
val name = getStepName(ruleParam.getOutDfName())
val transformStep = SparkSqlTransformStep(
- name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+ name, ruleParam.getRule, ruleParam.getDetails, None, ruleParam.getCache)
transformStep +: buildDirectWriteSteps(ruleParam)
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 31eef69..3bb5737 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -81,25 +81,22 @@ case class AccuracyExpr2DQSteps(context: DQContext,
s"SELECT ${selClause} FROM `${sourceName}` " +
s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
}
- val missRecordsTransStep =
- SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap,
true)
val missRecordsWriteSteps = procType match {
case BatchProcessType =>
val rwName =
ruleParam.getOutputOpt(RecordOutputType).
flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
- RecordWriteStep(rwName, missRecordsTableName) :: Nil
- case StreamingProcessType => Nil
- }
- val missRecordsUpdateWriteSteps = procType match {
- case BatchProcessType => Nil
+ RecordWriteStep(rwName, missRecordsTableName)
case StreamingProcessType =>
val dsName =
ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
- DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
+ DataSourceUpdateWriteStep(dsName, missRecordsTableName)
}
+ val missRecordsTransStep =
+ SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap,
Some(missRecordsWriteSteps), true)
+
// 2. miss count
val missCountTableName = "__missCount"
val missColName = details.getStringOrKey(_miss)
@@ -151,23 +148,23 @@ case class AccuracyExpr2DQSteps(context: DQContext,
|ON `${totalCountTableName}`.`${ConstantColumns.tmst}` =
`${missCountTableName}`.`${ConstantColumns.tmst}`
""".stripMargin
}
- val accuracyTransStep = SparkSqlTransformStep(accuracyTableName,
accuracyMetricSql, emptyMap)
- accuracyTransStep.parentSteps += missCountTransStep
- accuracyTransStep.parentSteps += totalCountTransStep
- val accuracyMetricWriteSteps = procType match {
+
+ val accuracyMetricWriteStep = procType match {
case BatchProcessType =>
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
- MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
- case StreamingProcessType => Nil
+ Some(MetricWriteStep(mwName, accuracyTableName, flattenType))
+ case StreamingProcessType => None
}
- val batchWriteSteps =
- accuracyMetricWriteSteps ++ missRecordsWriteSteps ++
missRecordsUpdateWriteSteps
+ val accuracyTransStep =
+ SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap,
accuracyMetricWriteStep)
+ accuracyTransStep.parentSteps += missCountTransStep
+ accuracyTransStep.parentSteps += totalCountTransStep
procType match {
- case BatchProcessType => accuracyTransStep :: batchWriteSteps
+ case BatchProcessType => accuracyTransStep :: Nil
// streaming extra steps
case StreamingProcessType =>
// 5. accuracy metric merge
@@ -178,15 +175,16 @@ case class AccuracyExpr2DQSteps(context: DQContext,
(AccuracyOprKeys._total -> totalColName),
(AccuracyOprKeys._matched -> matchedColName)
)
- val accuracyMetricTransStep =
DataFrameOpsTransformStep(accuracyMetricTableName,
- accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
- accuracyMetricTransStep.parentSteps += accuracyTransStep
val accuracyMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
}
+ val accuracyMetricTransStep =
DataFrameOpsTransformStep(accuracyMetricTableName,
+ accuracyTableName, accuracyMetricRule, accuracyMetricDetails,
Some(accuracyMetricWriteStep))
+ accuracyMetricTransStep.parentSteps += accuracyTransStep
+
// 6. collect accuracy records
val accuracyRecordTableName = "__accuracyRecords"
@@ -196,9 +194,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
|FROM `${accuracyMetricTableName}` WHERE
`${ConstantColumns.record}`
""".stripMargin
}
- val accuracyRecordTransStep = SparkSqlTransformStep(
- accuracyRecordTableName, accuracyRecordSql, emptyMap)
- accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
+
val accuracyRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -206,10 +202,11 @@ case class AccuracyExpr2DQSteps(context: DQContext,
RecordWriteStep(rwName, missRecordsTableName,
Some(accuracyRecordTableName))
}
+ val accuracyRecordTransStep = SparkSqlTransformStep(
+ accuracyRecordTableName, accuracyRecordSql, emptyMap,
Some(accuracyRecordWriteStep))
+ accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
- // extra steps
- val streamingWriteSteps = accuracyMetricWriteStep ::
accuracyRecordWriteStep :: Nil
- accuracyRecordTransStep :: batchWriteSteps ++ streamingWriteSteps
+ accuracyRecordTransStep :: Nil
}
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 3df4a12..7312f29 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -81,7 +81,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
s"SELECT ${selClause} FROM `${sourceName}`"
}
val sourceAliasTransStep =
- SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap,
true)
+ SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap,
None, true)
// 2. incomplete record
val incompleteRecordsTableName = "__incompleteRecords"
@@ -91,15 +91,17 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val incompleteRecordsSql =
s"SELECT * FROM `${sourceAliasTableName}` WHERE
${incompleteWhereClause}"
- val incompleteRecordTransStep =
- SparkSqlTransformStep(incompleteRecordsTableName,
incompleteRecordsSql, emptyMap, true)
- incompleteRecordTransStep.parentSteps += sourceAliasTransStep
val incompleteRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
.getOrElse(incompleteRecordsTableName)
RecordWriteStep(rwName, incompleteRecordsTableName)
}
+ val incompleteRecordTransStep =
+ SparkSqlTransformStep(incompleteRecordsTableName,
incompleteRecordsSql, emptyMap,
+ Some(incompleteRecordWriteStep), true)
+ incompleteRecordTransStep.parentSteps += sourceAliasTransStep
+
// 3. incomplete count
val incompleteCountTableName = "__incompleteCount"
@@ -149,21 +151,19 @@ case class CompletenessExpr2DQSteps(context: DQContext,
|ON `${totalCountTableName}`.`${ConstantColumns.tmst}` =
`${incompleteCountTableName}`.`${ConstantColumns.tmst}`
""".stripMargin
}
- val completeTransStep = SparkSqlTransformStep(completeTableName,
completeMetricSql, emptyMap)
- completeTransStep.parentSteps += incompleteCountTransStep
- completeTransStep.parentSteps += totalCountTransStep
val completeWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, completeTableName, flattenType)
}
+ val completeTransStep =
+ SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap,
Some(completeWriteStep))
+ completeTransStep.parentSteps += incompleteCountTransStep
+ completeTransStep.parentSteps += totalCountTransStep
val transSteps = completeTransStep :: Nil
- val writeSteps = incompleteRecordWriteStep :: completeWriteStep :: Nil
-
- // full steps
- transSteps ++ writeSteps
+ transSteps
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 0e2b10e..65460c3 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -102,7 +102,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
s"SELECT ${selClause} FROM `${sourceName}`"
}
val sourceAliasTransStep =
- SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap,
true)
+ SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap,
None, true)
// 2. total metric
val totalTableName = "__totalMetric"
@@ -110,11 +110,12 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
val totalSql = {
s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
}
- val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql,
emptyMap)
- totalTransStep.parentSteps += sourceAliasTransStep
val totalMetricWriteStep = {
MetricWriteStep(totalColName, totalTableName, EntriesFlattenType,
writeTimestampOpt)
}
+ val totalTransStep =
+ SparkSqlTransformStep(totalTableName, totalSql, emptyMap,
Some(totalMetricWriteStep))
+ totalTransStep.parentSteps += sourceAliasTransStep
// 3. group by self
val selfGroupTableName = "__selfGroup"
@@ -128,13 +129,12 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val selfGroupTransStep =
- SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+ SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap,
None, true)
selfGroupTransStep.parentSteps += sourceAliasTransStep
val transSteps1 = totalTransStep :: selfGroupTransStep :: Nil
- val writeSteps1 = totalMetricWriteStep :: Nil
- val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
+ val (transSteps2, dupCountTableName) = procType match {
case StreamingProcessType if (withOlderTable) =>
// 4.0 update old data
val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName,
targetName)
@@ -208,14 +208,12 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val finalDupCountTransStep =
- SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql,
emptyMap, true)
+ SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql,
emptyMap, None, true)
finalDupCountTransStep.parentSteps += groupTransStep
- ((finalDupCountTransStep :: Nil, targetDsUpdateWriteStep :: Nil),
- finalDupCountTableName)
+ (finalDupCountTransStep :: targetDsUpdateWriteStep :: Nil,
finalDupCountTableName)
case _ =>
- ((selfGroupTransStep :: Nil, totalMetricWriteStep :: Nil),
- selfGroupTableName)
+ (selfGroupTransStep :: Nil, selfGroupTableName)
}
// 8. distinct metric
@@ -227,16 +225,16 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|FROM `${dupCountTableName}` WHERE `${ConstantColumns.distinct}`
""".stripMargin
}
- val distTransStep = SparkSqlTransformStep(distTableName, distSql,
emptyMap)
val distMetricWriteStep = {
MetricWriteStep(distColName, distTableName, EntriesFlattenType,
writeTimestampOpt)
}
+ val distTransStep =
+ SparkSqlTransformStep(distTableName, distSql, emptyMap,
Some(distMetricWriteStep))
val transSteps3 = distTransStep :: Nil
- val writeSteps3 = distMetricWriteStep :: Nil
val duplicationArrayName = details.getString(_duplicationArray, "")
- val (transSteps4, writeSteps4) = if (duplicationArrayName.nonEmpty) {
+ val transSteps4 = if (duplicationArrayName.nonEmpty) {
val recordEnable = details.getBoolean(_recordEnable, false)
if (groupAliases.size > 0) {
// with some group by requirement
@@ -278,12 +276,23 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|WHERE NOT `${ConstantColumns.distinct}` OR
`${ConstantColumns.rowNumber}` > 1
""".stripMargin
}
- val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName,
dupItemsSql, emptyMap)
- dupItemsTransStep.parentSteps += rnTransStep
val dupItemsWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
}
+ val dupItemsTransStep = {
+ if (recordEnable) {
+ SparkSqlTransformStep(
+ dupItemsTableName,
+ dupItemsSql,
+ emptyMap,
+ Some(dupItemsWriteStep)
+ )
+ } else {
+ SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
+ }
+ }
+ dupItemsTransStep.parentSteps += rnTransStep
// 12. group by dup Record metric
val groupDupMetricTableName = "__groupDupMetric"
@@ -295,25 +304,22 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|FROM `${dupItemsTableName}` GROUP BY ${groupSelClause},
`${dupColName}`
""".stripMargin
}
- val groupDupMetricTransStep =
- SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql,
emptyMap)
- groupDupMetricTransStep.parentSteps += dupItemsTransStep
val groupDupMetricWriteStep = {
MetricWriteStep(duplicationArrayName,
groupDupMetricTableName,
ArrayFlattenType,
writeTimestampOpt)
}
+ val groupDupMetricTransStep =
+ SparkSqlTransformStep(
+ groupDupMetricTableName,
+ groupDupMetricSql,
+ emptyMap,
+ Some(groupDupMetricWriteStep)
+ )
+ groupDupMetricTransStep.parentSteps += dupItemsTransStep
- val msteps = groupDupMetricTransStep :: Nil
- val wsteps = if (recordEnable) {
- dupItemsWriteStep :: groupDupMetricWriteStep :: Nil
- } else {
- groupDupMetricWriteStep :: Nil
- }
-
- (msteps, wsteps)
-
+ groupDupMetricTransStep :: Nil
} else {
// no group by requirement
// 9. duplicate record
@@ -330,16 +336,25 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
""".stripMargin
}
- val dupRecordTransStep =
- SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap,
true)
-
val dupRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
.getOrElse(dupRecordTableName)
-
RecordWriteStep(rwName, dupRecordTableName, None,
writeTimestampOpt)
}
+ val dupRecordTransStep = {
+ if (recordEnable) {
+ SparkSqlTransformStep(
+ dupRecordTableName,
+ dupRecordSql,
+ emptyMap,
+ Some(dupRecordWriteStep),
+ true
+ )
+ } else {
+ SparkSqlTransformStep(dupRecordTableName, dupRecordSql,
emptyMap, None, true)
+ }
+ }
// 10. duplicate metric
val dupMetricTableName = "__dupMetric"
@@ -350,8 +365,6 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
""".stripMargin
}
- val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName,
dupMetricSql, emptyMap)
- dupMetricTransStep.parentSteps += dupRecordTransStep
val dupMetricWriteStep = {
MetricWriteStep(
duplicationArrayName,
@@ -360,22 +373,21 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
writeTimestampOpt
)
}
+ val dupMetricTransStep =
+ SparkSqlTransformStep(
+ dupMetricTableName,
+ dupMetricSql,
+ emptyMap,
+ Some(dupMetricWriteStep)
+ )
+ dupMetricTransStep.parentSteps += dupRecordTransStep
- val msteps = dupMetricTransStep :: Nil
- val wsteps = if (recordEnable) {
- dupRecordWriteStep :: dupMetricWriteStep :: Nil
- } else {
- dupMetricWriteStep :: Nil
- }
-
- (msteps, wsteps)
+ dupMetricTransStep :: Nil
}
- } else (Nil, Nil)
+ } else Nil
// full steps
- transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++
- writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4
-
+ transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
index 492f4fd..e9a65b4 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -21,9 +21,10 @@ package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.configuration.enums._
-import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
+import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.dsl.expr.Expr
+import org.apache.griffin.measure.step.write.{MetricWriteStep,
RecordWriteStep, WriteStep}
trait Expr2DQSteps extends Loggable with Serializable {
@@ -31,7 +32,6 @@ trait Expr2DQSteps extends Loggable with Serializable {
protected val emptyMap = Map[String, Any]()
def getDQSteps(): Seq[DQStep]
-
}
/**
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index af493af..68ca2f4 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -97,14 +97,15 @@ case class ProfilingExpr2DQSteps(context: DQContext,
s"${fromClause} ${preGroupbyClause} ${groupbyClause}
${postGroupbyClause}"
}
val profilingName = ruleParam.getOutDfName()
- val profilingTransStep = SparkSqlTransformStep(profilingName,
profilingSql, details)
val profilingMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, profilingName, flattenType)
}
- profilingTransStep :: profilingMetricWriteStep :: Nil
+ val profilingTransStep =
+ SparkSqlTransformStep(profilingName, profilingSql, details,
Some(profilingMetricWriteStep))
+ profilingTransStep :: Nil
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 03c2c8d..5a3acfb 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -104,7 +104,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS
`${latencyColName}` " +
s"FROM `${inTimeTableName}`"
}
- val latencyTransStep = SparkSqlTransformStep(latencyTableName,
latencySql, emptyMap, true)
+ val latencyTransStep = SparkSqlTransformStep(latencyTableName,
latencySql, emptyMap, None, true)
latencyTransStep.parentSteps += inTimeTransStep
// 3. timeliness metric
@@ -129,27 +129,26 @@ case class TimelinessExpr2DQSteps(context: DQContext,
|GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
}
- val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql,
emptyMap)
- metricTransStep.parentSteps += latencyTransStep
val metricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, metricTableName, flattenType)
}
+ val metricTransStep =
+ SparkSqlTransformStep(metricTableName, metricSql, emptyMap,
Some(metricWriteStep))
+ metricTransStep.parentSteps += latencyTransStep
// current steps
val transSteps1 = metricTransStep :: Nil
- val writeSteps1 = metricWriteStep :: Nil
// 4. timeliness record
- val (transSteps2, writeSteps2) =
TimeUtil.milliseconds(details.getString(_threshold, "")) match {
+ val transSteps2 = TimeUtil.milliseconds(details.getString(_threshold,
"")) match {
case Some(tsh) =>
val recordTableName = "__lateRecords"
val recordSql = {
s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` >
${tsh}"
}
- val recordTransStep = SparkSqlTransformStep(recordTableName,
recordSql, emptyMap)
val recordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -157,12 +156,16 @@ case class TimelinessExpr2DQSteps(context: DQContext,
RecordWriteStep(rwName, recordTableName, None)
}
- (recordTransStep :: Nil, recordWriteStep :: Nil)
- case _ => (Nil, Nil)
+ val recordTransStep =
+ SparkSqlTransformStep(recordTableName, recordSql, emptyMap,
Some(recordWriteStep))
+ recordTransStep.parentSteps += latencyTransStep
+
+ recordTransStep :: Nil
+ case _ => Nil
}
// 5. ranges
- val (transSteps3, writeSteps3) =
TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
+ val transSteps3 = TimeUtil.milliseconds(details.getString(_stepSize,
"")) match {
case Some(stepSize) =>
// 5.1 range
val rangeTableName = "__range"
@@ -174,6 +177,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val rangeTransStep = SparkSqlTransformStep(rangeTableName, rangeSql,
emptyMap)
+ rangeTransStep.parentSteps += latencyTransStep
// 5.2 range metric
val rangeMetricTableName = "__rangeMetric"
@@ -190,20 +194,20 @@ case class TimelinessExpr2DQSteps(context: DQContext,
|FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`,
`${stepColName}`
""".stripMargin
}
- val rangeMetricTransStep =
- SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql,
emptyMap)
- rangeMetricTransStep.parentSteps += rangeTransStep
val rangeMetricWriteStep = {
MetricWriteStep(stepColName, rangeMetricTableName,
ArrayFlattenType)
}
+ val rangeMetricTransStep =
+ SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql,
emptyMap, Some(rangeMetricWriteStep))
+ rangeMetricTransStep.parentSteps += rangeTransStep
- (rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
- case _ => (Nil, Nil)
+ rangeMetricTransStep :: Nil
+ case _ => Nil
}
// 6. percentiles
val percentiles = getPercentiles(details)
- val (transSteps4, writeSteps4) = if (percentiles.size > 0) {
+ val transSteps4 = if (percentiles.size > 0) {
val percentileTableName = "__percentile"
val percentileColName = details.getStringOrKey(_percentileColPrefix)
val percentileCols = percentiles.map { pct =>
@@ -217,19 +221,18 @@ case class TimelinessExpr2DQSteps(context: DQContext,
|FROM `${latencyTableName}`
""".stripMargin
}
- val percentileTransStep =
- SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
-
val percentileWriteStep = {
MetricWriteStep(percentileTableName, percentileTableName,
DefaultFlattenType)
}
+ val percentileTransStep =
+ SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap,
Some(percentileWriteStep))
+ percentileTransStep.parentSteps += latencyTransStep
- (percentileTransStep :: Nil, percentileWriteStep :: Nil)
- } else (Nil, Nil)
+ percentileTransStep :: Nil
+ } else Nil
// full steps
- transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++
- writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4
+ transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 7f259ea..a19b35c 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -117,7 +117,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` " +
s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
}
- val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql,
emptyMap, true)
+ val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql,
emptyMap, None, true)
groupTransStep.parentSteps += joinedTransStep
// 5. total metric
@@ -131,8 +131,9 @@ case class UniquenessExpr2DQSteps(context: DQContext,
|FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
}
- val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql,
emptyMap)
val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName,
EntriesFlattenType)
+ val totalTransStep =
+ SparkSqlTransformStep(totalTableName, totalSql, emptyMap,
Some(totalMetricWriteStep))
// 6. unique record
val uniqueRecordTableName = "__uniqueRecord"
@@ -155,24 +156,21 @@ case class UniquenessExpr2DQSteps(context: DQContext,
|FROM `${uniqueRecordTableName}` GROUP BY
`${ConstantColumns.tmst}`
""".stripMargin
}
- val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql,
emptyMap)
- uniqueTransStep.parentSteps += uniqueRecordTransStep
-
val uniqueMetricWriteStep =
MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
+ val uniqueTransStep =
+ SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap,
Some(uniqueMetricWriteStep))
+ uniqueTransStep.parentSteps += uniqueRecordTransStep
val transSteps1 = totalTransStep :: uniqueTransStep :: Nil
- val writeSteps1 = totalMetricWriteStep :: uniqueMetricWriteStep :: Nil
val duplicationArrayName = details.getString(_duplicationArray, "")
- val (transSteps2, writeSteps2) = if (duplicationArrayName.nonEmpty) {
+ val transSteps2 = if (duplicationArrayName.nonEmpty) {
// 8. duplicate record
val dupRecordTableName = "__dupRecords"
val dupRecordSql = {
s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
}
- val dupRecordTransStep =
- SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap,
true)
val dupRecordWriteStep = {
val rwName =
@@ -181,6 +179,8 @@ case class UniquenessExpr2DQSteps(context: DQContext,
RecordWriteStep(rwName, dupRecordTableName)
}
+ val dupRecordTransStep =
+ SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap,
Some(dupRecordWriteStep), true)
// 9. duplicate metric
val dupMetricTableName = "__dupMetric"
@@ -201,18 +201,22 @@ case class UniquenessExpr2DQSteps(context: DQContext,
|GROUP BY ${dupMetricGroupbyClause}
""".stripMargin
}
- val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName,
dupMetricSql, emptyMap)
- dupMetricTransStep.parentSteps += dupRecordTransStep
val dupMetricWriteStep = {
MetricWriteStep(duplicationArrayName, dupMetricTableName,
ArrayFlattenType)
}
+ val dupMetricTransStep =
+ SparkSqlTransformStep(dupMetricTableName,
+ dupMetricSql,
+ emptyMap,
+ Some(dupMetricWriteStep)
+ )
+ dupMetricTransStep.parentSteps += dupRecordTransStep
- (dupMetricTransStep :: Nil,
- dupRecordWriteStep :: dupMetricWriteStep :: Nil)
- } else (Nil, Nil)
+ dupMetricTransStep :: Nil
+ } else Nil
// full steps
- transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2
+ transSteps1 ++ transSteps2
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index b07595a..c393706 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -19,14 +19,16 @@ under the License.
package org.apache.griffin.measure.step.transform
import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.write.WriteStep
/**
* data frame ops transform step
*/
-case class DataFrameOpsTransformStep(name: String,
+case class DataFrameOpsTransformStep[T <: WriteStep](name: String,
inputDfName: String,
rule: String,
details: Map[String, Any],
+ writeStepOpt: Option[T] = None,
cache: Boolean = false
) extends TransformStep {
@@ -43,7 +45,10 @@ case class DataFrameOpsTransformStep(name: String,
}
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
- true
+ writeStepOpt match {
+ case Some(writeStep) => writeStep.execute(context)
+ case None => true
+ }
} catch {
case e: Throwable =>
error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}", e)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 59ea822..00edf07 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -19,23 +19,27 @@ under the License.
package org.apache.griffin.measure.step.transform
import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.write.WriteStep
/**
* spark sql transform step
*/
-case class SparkSqlTransformStep(name: String,
- rule: String,
- details: Map[String, Any],
- cache: Boolean = false
- ) extends TransformStep {
-
+case class SparkSqlTransformStep[T <: WriteStep](name: String,
+ rule: String,
+ details: Map[String, Any],
+ writeStepOpt: Option[T] =
None,
+ cache: Boolean = false
+ ) extends TransformStep {
def doExecute(context: DQContext): Boolean = {
val sqlContext = context.sqlContext
try {
val df = sqlContext.sql(rule)
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
- true
+ writeStepOpt match {
+ case Some(writeStep) => writeStep.execute(context)
+ case None => true
+ }
} catch {
case e: Throwable =>
error(s"run spark sql [ ${rule} ] error: ${e.getMessage}", e)
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index e640c45..5314669 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -19,15 +19,14 @@ under the License.
package org.apache.griffin.measure.step
import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.scalatest._
+import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.enums.BatchProcessType
import org.apache.griffin.measure.context.ContextId
import org.apache.griffin.measure.context.DQContext
-import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.step.transform.TransformStep
-import org.scalatest._
-
class TransformStepTest extends FlatSpec with Matchers with DataFrameSuiteBase
with Loggable {
case class DualTransformStep(name: String,