Github user ashutakGG commented on a diff in the pull request:
https://github.com/apache/incubator-griffin/pull/434#discussion_r224438517
--- Diff:
measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
---
@@ -0,0 +1,169 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.transformations
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.apache.spark.sql.DataFrame
+import org.scalatest._
+
+import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import org.apache.griffin.measure.context.{ContextId, DQContext}
+import org.apache.griffin.measure.datasource.DataSourceFactory
+import org.apache.griffin.measure.job.builder.DQJobBuilder
+
+case class AccuracyResult(total: Long, miss: Long, matched: Long,
matchedFraction: Double)
+
+class AccuracyTransformationsIntegrationTest extends FlatSpec with
Matchers with DataFrameSuiteBase {
+ import spark.implicits._
+
+ private val EMPTY_PERSON_TABLE = "empty_person"
+ private val PERSON_TABLE = "person"
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ createPersonTable()
+ createEmptyPersonTable()
+
+ spark.conf.set("spark.sql.crossJoin.enabled", "true")
+ }
+
+ "accuracy" should "basically work" in {
+ checkAccuracy(
+ sourceName = PERSON_TABLE,
+ targetName = PERSON_TABLE,
+ expectedResult = AccuracyResult(total = 2, miss = 0, matched = 2,
matchedFraction = 1.0))
+ }
+
+ "accuracy" should "work with empty target" in {
+ checkAccuracy(
+ sourceName = PERSON_TABLE,
+ targetName = EMPTY_PERSON_TABLE,
+ expectedResult = AccuracyResult(total = 2, miss = 2, matched = 0,
matchedFraction = 0.0)
+ )
+ }
+
+ "accuracy" should "work with empty source" in {
+ checkAccuracy(
+ sourceName = EMPTY_PERSON_TABLE,
+ targetName = PERSON_TABLE,
+ expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0,
matchedFraction = 1.0))
+ }
+
+ "accuracy" should "work with empty source and target" in {
+ checkAccuracy(
+ sourceName = EMPTY_PERSON_TABLE,
+ targetName = EMPTY_PERSON_TABLE,
+ expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0,
matchedFraction = 1.0))
+ }
+
+ private def checkAccuracy(sourceName: String, targetName: String,
expectedResult: AccuracyResult) = {
+ val dqContext: DQContext = getDqContext(
+ dataSourcesParam = List(
+ DataSourceParam(
+ name = "source",
+ connectors = List(dataConnectorParam(tableName = sourceName))
+ ),
+ DataSourceParam(
+ name = "target",
+ connectors = List(dataConnectorParam(tableName = targetName))
+ )
+ ))
+
+ val accuracyRule = RuleParam(
+ dslType = "griffin-dsl",
+ dqType = "ACCURACY",
+ outDfName = "person_accuracy",
+ rule = "source.name = target.name"
+ )
+
+ val res = getRuleResults(dqContext, accuracyRule)
+ .as[AccuracyResult]
+ .collect()
+
+ res.length shouldBe 1
+
+ res(0) shouldEqual expectedResult
+ }
+
+ private def getRuleResults(dqContext: DQContext, rule: RuleParam):
DataFrame = {
+ val dqJob = DQJobBuilder.buildDQJob(
+ dqContext,
+ evaluateRuleParam = EvaluateRuleParam(List(rule))
+ )
+
+ dqJob.execute(dqContext)
+
+ spark.sql(s"select * from ${rule.getOutDfName()}")
+ }
+
+ private def createPersonTable(): Unit = {
+ val personCsvPath =
getClass.getResource("/hive/person_table.csv").getFile
+
+ spark.sql(
+ s"CREATE TABLE IF NOT EXISTS ${PERSON_TABLE} " +
+ "( " +
+ " name String," +
+ " age int " +
+ ") " +
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY
'\n' " +
+ "STORED AS TEXTFILE"
+ )
+
+ spark.sql(s"LOAD DATA LOCAL INPATH '$personCsvPath' OVERWRITE INTO
TABLE ${PERSON_TABLE}")
+ }
+
+ private def createEmptyPersonTable(): Unit = {
+ spark.sql(
+ s"CREATE TABLE IF NOT EXISTS ${EMPTY_PERSON_TABLE} " +
+ "( " +
+ " name String," +
+ " age int " +
+ ") " +
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY
'\n' " +
+ "STORED AS TEXTFILE"
+ )
+
+ spark.sql(s"select * from ${EMPTY_PERSON_TABLE}").show()
--- End diff --
yeah. Fixed it with drop tables.
---