This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new e039434  [GRIFFIN-286] Remove spark-testing-base dependency jar
e039434 is described below

commit e039434610aa1794f43e20e089f6362ecf015163
Author: wankunde <[email protected]>
AuthorDate: Mon Sep 9 08:28:19 2019 +0800

    [GRIFFIN-286] Remove spark-testing-base dependency jar
    
    Now we use spark-testing-base jar to test spark job in measure module, but 
this jar maybe conflict with the spark version(CDH spark version,spark AE) or 
scala version(few scala version with specified spark version).
    
    So I suggest removing the dependency of this package.
    
    Author: wankunde <[email protected]>
    
    Closes #531 from wankunde/remoteSparkTestBase.
---
 measure/pom.xml                                    | 16 +++--
 .../apache/griffin/measure/SparkSuiteBase.scala    | 71 ++++++++++++++++++++++
 .../measure/context/DataFrameCacheTest.scala       | 11 ++--
 .../griffin/measure/job/BatchDQAppTest.scala       | 30 ++++-----
 .../apache/griffin/measure/sink/SinkTestBase.scala |  8 +--
 .../griffin/measure/step/TransformStepTest.scala   |  4 +-
 .../AccuracyTransformationsIntegrationTest.scala   |  8 +--
 7 files changed, 106 insertions(+), 42 deletions(-)

diff --git a/measure/pom.xml b/measure/pom.xml
index d988fa3..c68cdd7 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -45,7 +45,6 @@ under the License.
         <log4j.version>1.2.16</log4j.version>
         <curator.version>2.10.0</curator.version>
         <scalamock.version>3.6.0</scalamock.version>
-        <spark.testing.version>0.9.0</spark.testing.version>
         <mysql.java.version>5.1.47</mysql.java.version>
         <cassandra.connector.version>2.4.1</cassandra.connector.version>
     </properties>
@@ -73,6 +72,12 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
             <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
@@ -173,13 +178,6 @@ under the License.
             <scope>test</scope>
         </dependency>
 
-        <!--spark testing-->
-        <dependency>
-            <groupId>com.holdenkarau</groupId>
-            <artifactId>spark-testing-base_${scala.binary.version}</artifactId>
-            <version>${spark.version}_${spark.testing.version}</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
@@ -255,4 +253,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala 
b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
new file mode 100644
index 0000000..4d6a06f
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.SparkSession
+import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+
+trait SparkSuiteBase extends FlatSpec with BeforeAndAfterAll {
+
+  @transient var spark: SparkSession = _
+  @transient var sc: SparkContext = _
+  @transient var checkpointDir: String = _
+
+  var conf = new SparkConf(false)
+
+  override def beforeAll() {
+    super.beforeAll()
+    cleanTestHiveData()
+    this.spark = SparkSession.builder
+      .master("local[4]")
+      .appName("Griffin Job Suite")
+      .config(conf)
+      .enableHiveSupport()
+      .getOrCreate()
+    sc = this.spark.sparkContext
+  }
+
+  override def afterAll() {
+    try {
+      SparkSession.clearActiveSession()
+      if (spark != null) {
+        spark.stop()
+      }
+      spark = null
+      cleanTestHiveData()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  def cleanTestHiveData(): Unit = {
+    val metastoreDB = new File("metastore_db")
+    if(metastoreDB.exists) {
+      FileUtils.forceDelete(metastoreDB)
+    }
+    val sparkWarehouse = new File("spark-warehouse")
+    if(sparkWarehouse.exists) {
+      FileUtils.forceDelete(sparkWarehouse)
+    }
+  }
+}
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
index 6d6375e..32f085a 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
@@ -18,12 +18,13 @@ under the License.
 */
 package org.apache.griffin.measure.context
 
-import com.holdenkarau.spark.testing.DataFrameSuiteBase
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.types._
 import org.scalatest._
 
-class DataFrameCacheTest extends FlatSpec with Matchers with 
DataFrameSuiteBase {
+import org.apache.griffin.measure.SparkSuiteBase
+
+class DataFrameCacheTest extends FlatSpec with Matchers with SparkSuiteBase {
 
   def createDataFrame(arr: Seq[Int]): DataFrame = {
     val schema = StructType(Array(
@@ -34,8 +35,8 @@ class DataFrameCacheTest extends FlatSpec with Matchers with 
DataFrameSuiteBase
     val rows = arr.map { i =>
       Row(i.toLong, s"name_$i", i + 15)
     }
-    val rowRdd = sqlContext.sparkContext.parallelize(rows)
-    sqlContext.createDataFrame(rowRdd, schema)
+    val rowRdd = spark.sparkContext.parallelize(rows)
+    spark.createDataFrame(rowRdd, schema)
   }
 
   "data frame cache" should "be able to cache and uncache data frames" in {
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala 
b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index 053001a..c1981af 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -19,21 +19,15 @@ under the License.
 package org.apache.griffin.measure.job
 
 import scala.util.{Failure, Success, Try}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.scalatest.BeforeAndAfterAll
-
 import org.apache.griffin.measure.Application.readParamFile
+import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition.EnvConfig
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
 
-class BatchDQAppTest extends DQAppTest with BeforeAndAfterAll {
+class BatchDQAppTest extends DQAppTest with SparkSuiteBase {
 
   override def beforeAll(): Unit = {
-    super.beforeAll()
-
     envParam = readParamFile[EnvConfig](getConfigFilePath("/env-batch.json")) 
match {
       case Success(p) => p
       case Failure(ex) =>
@@ -45,11 +39,13 @@ class BatchDQAppTest extends DQAppTest with 
BeforeAndAfterAll {
 
     Try {
       // build spark 2.0+ application context
-      var conf = new SparkConf().setAppName("BatchDQApp Test")
+      conf.setAppName("BatchDQApp Test")
       conf.setAll(sparkParam.getConfig)
       conf.set("spark.sql.crossJoin.enabled", "true")
 
-      sparkSession = 
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+      super.beforeAll()
+
+      sparkSession = spark
       val logLevel = getGriffinLogLevel()
       sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
       griffinLogger.setLevel(logLevel)
@@ -60,15 +56,13 @@ class BatchDQAppTest extends DQAppTest with 
BeforeAndAfterAll {
     }
   }
 
-  override def afterAll(): Unit = {
-    super.afterAll()
-    sparkSession.stop()
-  }
-
   def runAndCheckResult(metrics: Map[String, Any]): Unit = {
-    val runResult = dqApp.run
-    assert(runResult.isSuccess)
-    assert(runResult.get)
+    dqApp.run match {
+      case Success(ret) => assert(ret)
+      case Failure(ex) =>
+        error(s"process run error: ${ex.getMessage}", ex)
+        throw ex
+    }
 
     // check Result Metrics
     val dqContext = dqApp.asInstanceOf[BatchDQApp].dqContext
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala 
b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
index a88f1ee..69b7e74 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -18,7 +18,6 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
@@ -29,8 +28,9 @@ import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext}
+import org.apache.griffin.measure.SparkSuiteBase
 
-trait SinkTestBase extends FlatSpec with Matchers with DataFrameSuiteBase with 
Loggable {
+trait SinkTestBase extends FlatSpec with Matchers with SparkSuiteBase with 
Loggable {
 
   var sinkParams: Seq[SinkParam]
 
@@ -55,7 +55,7 @@ trait SinkTestBase extends FlatSpec with Matchers with 
DataFrameSuiteBase with L
     val rows = arr.map { i =>
       Row(i.toLong, s"name_$i", if (i % 2 == 0) "man" else "women", i + 15)
     }
-    val rowRdd = sqlContext.sparkContext.parallelize(rows)
-    sqlContext.createDataFrame(rowRdd, schema)
+    val rowRdd = spark.sparkContext.parallelize(rows)
+    spark.createDataFrame(rowRdd, schema)
   }
 }
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index 5314669..acd5f6b 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -18,16 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure.step
 
-import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.scalatest._
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.context.ContextId
 import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.step.transform.TransformStep
 
-class TransformStepTest extends FlatSpec with Matchers with DataFrameSuiteBase 
with Loggable {
+class TransformStepTest extends FlatSpec with Matchers with SparkSuiteBase 
with Loggable {
 
   case class DualTransformStep(name: String,
                                duration: Int,
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index 129f0c5..fd11b93 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -18,7 +18,6 @@ under the License.
 */
 package org.apache.griffin.measure.transformations
 
-import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.spark.sql.DataFrame
 import org.scalatest._
 
@@ -27,12 +26,11 @@ import 
org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext}
 import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder
+import org.apache.griffin.measure.SparkSuiteBase
 
 case class AccuracyResult(total: Long, miss: Long, matched: Long, 
matchedFraction: Double)
 
-class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers 
with DataFrameSuiteBase {
-  import spark.implicits._
-
+class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers 
with SparkSuiteBase {
   private val EMPTY_PERSON_TABLE = "empty_person"
   private val PERSON_TABLE = "person"
 
@@ -100,6 +98,8 @@ class AccuracyTransformationsIntegrationTest extends 
FlatSpec with Matchers with
       rule = "source.name = target.name"
     )
 
+    val spark = this.spark
+    import spark.implicits._
     val res = getRuleResults(dqContext, accuracyRule)
       .as[AccuracyResult]
       .collect()

Reply via email to