This is an automated email from the ASF dual-hosted git repository.
wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new cef0a0e [GRIFFIN-304] Eliminate older contexts
cef0a0e is described below
commit cef0a0ec9b9265798bc4e99c1903aa8b68741a04
Author: chitralverma <[email protected]>
AuthorDate: Sat Nov 30 13:01:30 2019 +0800
[GRIFFIN-304] Eliminate older contexts
**What changes were proposed in this pull request?**
As SparkSession is a direct replacement for SparkContext, SQLContext and
HiveContext, there is no need to pass/ instantiate them. If any of the oder
contexts are needed, they can be derived from SparkSession.
This issue aims to eliminate dependency on older Contexts in favour of
SparkSession.
**Does this PR introduce any user-facing change?**
No
**How was this patch tested?**
Griffin test suite.
Author: chitralverma <[email protected]>
Closes #557 from chitralverma/eliminate-older-contexts.
---
.../apache/griffin/measure/context/DQContext.scala | 6 ++----
.../griffin/measure/context/TableRegister.scala | 6 +++---
.../measure/datasource/DataSourceFactory.scala | 2 +-
.../datasource/cache/StreamingCacheClient.scala | 8 +++----
.../cache/StreamingCacheClientFactory.scala | 14 ++++++------
.../cache/StreamingCacheJsonClient.scala | 2 +-
.../datasource/cache/StreamingCacheOrcClient.scala | 2 +-
.../cache/StreamingCacheParquetClient.scala | 4 ++--
.../batch/TextDirBatchDataConnector.scala | 2 +-
.../griffin/measure/launch/batch/BatchDQApp.scala | 6 ++----
.../measure/launch/streaming/StreamingDQApp.scala | 7 ++----
.../measure/step/builder/udf/GriffinUDFs.scala | 18 ++++++++--------
.../measure/step/transform/DataFrameOps.scala | 25 +++++++++++-----------
.../step/transform/DataFrameOpsTransformStep.scala | 8 +++----
.../step/transform/SparkSqlTransformStep.scala | 4 ++--
.../step/write/DataSourceUpdateWriteStep.scala | 2 +-
.../measure/step/write/MetricWriteStep.scala | 2 +-
.../measure/step/write/RecordWriteStep.scala | 2 +-
.../griffin/measure/job/BatchDQAppTest.scala | 3 +--
19 files changed, 58 insertions(+), 65 deletions(-)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 2fdf409..8069632 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.context
-import org.apache.spark.sql.{Encoders, SparkSession, SQLContext}
+import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums._
@@ -37,10 +37,8 @@ case class DQContext(contextId: ContextId,
procType: ProcessType
)(@transient implicit val sparkSession: SparkSession) {
- val sqlContext: SQLContext = sparkSession.sqlContext
-
val compileTableRegister: CompileTableRegister = CompileTableRegister()
- val runTimeTableRegister: RunTimeTableRegister =
RunTimeTableRegister(sqlContext)
+ val runTimeTableRegister: RunTimeTableRegister =
RunTimeTableRegister(sparkSession)
val dataFrameCache: DataFrameCache = DataFrameCache()
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
index c4dda3b..d1d9dbe 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
@@ -61,7 +61,7 @@ case class CompileTableRegister() extends TableRegister {}
/**
* register table name and create temp view during calculation
*/
-case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends
TableRegister {
+case class RunTimeTableRegister(@transient sparkSession: SparkSession) extends
TableRegister {
def registerTable(name: String, df: DataFrame): Unit = {
registerTable(name)
@@ -70,13 +70,13 @@ case class RunTimeTableRegister(@transient sqlContext:
SQLContext) extends Table
override def unregisterTable(name: String): Unit = {
if (existsTable(name)) {
- sqlContext.dropTempTable(name)
+ sparkSession.catalog.dropTempView(name)
tables -= name
}
}
override def unregisterAllTables(): Unit = {
val uts = getTables
- uts.foreach(t => sqlContext.dropTempTable(t))
+ uts.foreach(t => sparkSession.catalog.dropTempView(t))
tables.clear
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 28e616b..1b1bcf0 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -52,7 +52,7 @@ object DataSourceFactory extends Loggable {
// for streaming data cache
val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
- sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index,
timestampStorage)
+ sparkSession, dataSourceParam.getCheckpointOpt, name, index,
timestampStorage)
val dataConnectors: Seq[DataConnector] = connectorParams.flatMap {
connectorParam =>
DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index a03a468..5bfe41a 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -43,7 +43,7 @@ import org.apache.griffin.measure.utils.ParamUtil._
trait StreamingCacheClient
extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with
Serializable {
- val sqlContext: SQLContext
+ val sparkSession: SparkSession
val param: Map[String, Any]
val dsName: String
val index: Int
@@ -181,7 +181,7 @@ trait StreamingCacheClient
// new cache data
val newDfOpt = try {
- val dfr = sqlContext.read
+ val dfr = sparkSession.read
readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
} catch {
case e: Throwable =>
@@ -194,7 +194,7 @@ trait StreamingCacheClient
val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
val oldDfPath = s"${oldFilePath}/${idx}"
try {
- val dfr = sqlContext.read
+ val dfr = sparkSession.read
readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
} catch {
case e: Throwable =>
@@ -329,7 +329,7 @@ trait StreamingCacheClient
val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
val updateDf = df.filter(filterStr)
- val prlCount = sqlContext.sparkContext.defaultParallelism
+ val prlCount = sparkSession.sparkContext.defaultParallelism
// repartition
val repartitionedDf = updateDf.repartition(prlCount)
val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index eeda8ef..1948438 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.datasource.TimestampStorage
@@ -37,14 +37,14 @@ object StreamingCacheClientFactory extends Loggable {
/**
* create streaming cache client
- * @param sqlContext sqlContext in spark environment
+ * @param sparkSession sparkSession in spark environment
* @param checkpointOpt data source checkpoint/cache config option
* @param name data source name
* @param index data source index
* @param tmstCache the same tmstCache instance inside a data source
* @return streaming cache client option
*/
- def getClientOpt(sqlContext: SQLContext, checkpointOpt: Option[Map[String,
Any]],
+ def getClientOpt(sparkSession: SparkSession, checkpointOpt:
Option[Map[String, Any]],
name: String, index: Int, tmstCache: TimestampStorage
): Option[StreamingCacheClient] = {
checkpointOpt.flatMap { param =>
@@ -52,13 +52,13 @@ object StreamingCacheClientFactory extends Loggable {
val tp = param.getString(_type, "")
val dsCache = tp match {
case ParquetRegex() =>
- StreamingCacheParquetClient(sqlContext, param, name, index,
tmstCache)
+ StreamingCacheParquetClient(sparkSession, param, name, index,
tmstCache)
case JsonRegex() =>
- StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+ StreamingCacheJsonClient(sparkSession, param, name, index,
tmstCache)
case OrcRegex() =>
- StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+ StreamingCacheOrcClient(sparkSession, param, name, index,
tmstCache)
case _ =>
- StreamingCacheParquetClient(sqlContext, param, name, index,
tmstCache)
+ StreamingCacheParquetClient(sparkSession, param, name, index,
tmstCache)
}
Some(dsCache)
} catch {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
index c81d4d1..8db918a 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -25,7 +25,7 @@ import org.apache.griffin.measure.datasource.TimestampStorage
/**
* data source cache in json format
*/
-case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String,
Any],
+case class StreamingCacheJsonClient(sparkSession: SparkSession, param:
Map[String, Any],
dsName: String, index: Int,
timestampStorage: TimestampStorage
) extends StreamingCacheClient {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
index 0649b74..5707cfa 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -25,7 +25,7 @@ import org.apache.griffin.measure.datasource.TimestampStorage
/**
* data source cache in orc format
*/
-case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String,
Any],
+case class StreamingCacheOrcClient(sparkSession: SparkSession, param:
Map[String, Any],
dsName: String, index: Int,
timestampStorage: TimestampStorage
) extends StreamingCacheClient {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
index 9c369ee..699ccc2 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -25,14 +25,14 @@ import
org.apache.griffin.measure.datasource.TimestampStorage
/**
* data source cache in parquet format
*/
-case class StreamingCacheParquetClient(sqlContext: SQLContext,
+case class StreamingCacheParquetClient(sparkSession: SparkSession,
param: Map[String, Any],
dsName: String,
index: Int,
timestampStorage: TimestampStorage
) extends StreamingCacheClient {
-
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
"false")
+
sparkSession.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
"false")
protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
= {
info(s"write path: ${path}")
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 85b4774..d37f51b 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -98,7 +98,7 @@ case class TextDirBatchDataConnector(@transient sparkSession:
SparkSession,
// def metaData(): Try[Iterable[(String, String)]] = {
// Try {
-// val st = sqlContext.read.format("com.databricks.spark.avro").
+// val st = sparkSession.read.format("com.databricks.spark.avro").
// load(concreteFileFullPath).schema
// st.fields.map(f => (f.name, f.dataType.typeName))
// }
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index c05d043..a4a1d37 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -23,7 +23,7 @@ import java.util.Date
import scala.util.Try
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.SparkSession
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums._
@@ -43,7 +43,6 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
val metricName = dqParam.getName
val sinkParams = getSinkParams
- var sqlContext: SQLContext = _
var dqContext: DQContext = _
def retryable: Boolean = false
@@ -57,10 +56,9 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp
{
val logLevel = getGriffinLogLevel()
sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
griffinLogger.setLevel(logLevel)
- sqlContext = sparkSession.sqlContext
// register udf
- GriffinUDFAgent.register(sqlContext)
+ GriffinUDFAgent.register(sparkSession)
}
def run: Try[Boolean] = Try {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index be32eba..57e6f82 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor,
TimeUnit}
import scala.util.Try
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.griffin.measure.Loggable
@@ -49,8 +49,6 @@ case class StreamingDQApp(allParam: GriffinConfig) extends
DQApp {
val metricName = dqParam.getName
val sinkParams = getSinkParams
- var sqlContext: SQLContext = _
-
def retryable: Boolean = true
def init: Try[_] = Try {
@@ -62,7 +60,6 @@ case class StreamingDQApp(allParam: GriffinConfig) extends
DQApp {
val logLevel = getGriffinLogLevel()
sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
griffinLogger.setLevel(logLevel)
- sqlContext = sparkSession.sqlContext
// clear checkpoint directory
clearCpDir
@@ -72,7 +69,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends
DQApp {
OffsetCheckpointClient.init
// register udf
- GriffinUDFAgent.register(sqlContext)
+ GriffinUDFAgent.register(sparkSession)
}
def run: Try[Boolean] = Try {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
index 61b93ab..baa7a8c 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
@@ -18,12 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.udf
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
object GriffinUDFAgent {
- def register(sqlContext: SQLContext): Unit = {
- GriffinUDFs.register(sqlContext)
- GriffinUDAggFs.register(sqlContext)
+ def register(sparkSession: SparkSession): Unit = {
+ GriffinUDFs.register(sparkSession)
+ GriffinUDAggFs.register(sparkSession)
}
}
@@ -32,10 +32,10 @@ object GriffinUDFAgent {
*/
object GriffinUDFs {
- def register(sqlContext: SQLContext): Unit = {
- sqlContext.udf.register("index_of", indexOf _)
- sqlContext.udf.register("matches", matches _)
- sqlContext.udf.register("reg_replace", regReplace _)
+ def register(sparkSession: SparkSession): Unit = {
+ sparkSession.udf.register("index_of", indexOf _)
+ sparkSession.udf.register("matches", matches _)
+ sparkSession.udf.register("reg_replace", regReplace _)
}
private def indexOf(arr: Seq[String], v: String) = {
@@ -57,7 +57,7 @@ object GriffinUDFs {
*/
object GriffinUDAggFs {
- def register(sqlContext: SQLContext): Unit = {
+ def register(sparkSession: SparkSession): Unit = {
}
}
\ No newline at end of file
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
index ba64d33..8678db5 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
@@ -20,8 +20,9 @@ package org.apache.griffin.measure.step.transform
import java.util.Date
-import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
+import org.apache.spark.sql.{Encoders, Row, _}
import org.apache.spark.sql.types._
+
import org.apache.griffin.measure.context.ContextId
import org.apache.griffin.measure.context.streaming.metric._
import
org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
@@ -45,7 +46,7 @@ object DataFrameOps {
val _matchedFraction = "matchedFraction"
}
- def fromJson(sqlContext: SQLContext,
+ def fromJson(sparkSession: SparkSession,
inputDfName: String,
details: Map[String, Any]): DataFrame = {
val _colName = "col.name"
@@ -53,15 +54,15 @@ object DataFrameOps {
implicit val encoder = Encoders.STRING
- val df: DataFrame = sqlContext.table(s"`${inputDfName}`")
+ val df: DataFrame = sparkSession.table(s"`${inputDfName}`")
val rdd = colNameOpt match {
case Some(colName: String) => df.map(r => r.getAs[String](colName))
case _ => df.map(_.getAs[String](0))
}
- sqlContext.read.json(rdd) // slow process
+ sparkSession.read.json(rdd) // slow process
}
- def accuracy(sqlContext: SQLContext,
+ def accuracy(sparkSession: SparkSession,
inputDfName: String,
contextId: ContextId,
details: Map[String, Any]): DataFrame = {
@@ -82,7 +83,7 @@ object DataFrameOps {
}
}
- val df = sqlContext.table(s"`${inputDfName}`")
+ val df = sparkSession.table(s"`${inputDfName}`")
val results = df.rdd.flatMap { row =>
try {
@@ -116,16 +117,16 @@ object DataFrameOps {
val ar = r.result.asInstanceOf[AccuracyMetric]
Row(r.timeStamp, ar.miss, ar.total, ar.getMatch, ar.matchFraction,
!ar.initial, ar.eventual)
}.toArray
- val rowRdd = sqlContext.sparkContext.parallelize(rows)
- val retDf = sqlContext.createDataFrame(rowRdd, schema)
+ val rowRdd = sparkSession.sparkContext.parallelize(rows)
+ val retDf = sparkSession.createDataFrame(rowRdd, schema)
retDf
}
- def clear(sqlContext: SQLContext, inputDfName: String, details: Map[String,
Any]): DataFrame = {
- val df = sqlContext.table(s"`${inputDfName}`")
- val emptyRdd = sqlContext.sparkContext.emptyRDD[Row]
- sqlContext.createDataFrame(emptyRdd, df.schema)
+ def clear(sparkSession: SparkSession, inputDfName: String, details:
Map[String, Any]): DataFrame = {
+ val df = sparkSession.table(s"`${inputDfName}`")
+ val emptyRdd = sparkSession.sparkContext.emptyRDD[Row]
+ sparkSession.createDataFrame(emptyRdd, df.schema)
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index c393706..c5bcb13 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -33,14 +33,14 @@ case class DataFrameOpsTransformStep[T <: WriteStep](name:
String,
) extends TransformStep {
def doExecute(context: DQContext): Boolean = {
- val sqlContext = context.sqlContext
+ val sparkSession = context.sparkSession
try {
val df = rule match {
- case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext,
inputDfName, details)
+ case DataFrameOps._fromJson => DataFrameOps.fromJson(sparkSession,
inputDfName, details)
case DataFrameOps._accuracy =>
- DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId,
details)
+ DataFrameOps.accuracy(sparkSession, inputDfName, context.contextId,
details)
- case DataFrameOps._clear => DataFrameOps.clear(sqlContext,
inputDfName, details)
+ case DataFrameOps._clear => DataFrameOps.clear(sparkSession,
inputDfName, details)
case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
}
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 00edf07..e5c18e3 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -31,9 +31,9 @@ case class SparkSqlTransformStep[T <: WriteStep](name: String,
cache: Boolean = false
) extends TransformStep {
def doExecute(context: DQContext): Boolean = {
- val sqlContext = context.sqlContext
+ val sparkSession = context.sparkSession
try {
- val df = sqlContext.sql(rule)
+ val df = sparkSession.sql(rule)
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
writeStepOpt match {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index d2805cf..92f6c00 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -47,7 +47,7 @@ case class DataSourceUpdateWriteStep(dsName: String,
private def getDataFrame(context: DQContext, name: String):
Option[DataFrame] = {
try {
- val df = context.sqlContext.table(s"`${name}`")
+ val df = context.sparkSession.table(s"`${name}`")
Some(df)
} catch {
case e: Throwable =>
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index c7ebae7..3f3eaf5 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -75,7 +75,7 @@ case class MetricWriteStep(name: String,
private def getMetricMaps(context: DQContext): Seq[Map[String, Any]] = {
try {
- val pdf = context.sqlContext.table(s"`${inputName}`")
+ val pdf = context.sparkSession.table(s"`${inputName}`")
val records = pdf.toJSON.collect()
if (records.size > 0) {
records.flatMap { rec =>
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 3dcbe90..44b05ff 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -76,7 +76,7 @@ case class RecordWriteStep(name: String,
private def getDataFrame(context: DQContext, name: String):
Option[DataFrame] = {
try {
- val df = context.sqlContext.table(s"`${name}`")
+ val df = context.sparkSession.table(s"`${name}`")
Some(df)
} catch {
case e: Throwable =>
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index ae0bfb8..0dec789 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -47,10 +47,9 @@ class BatchDQAppTest extends DQAppTest {
val logLevel = getGriffinLogLevel()
sc.setLogLevel(sparkParam.getLogLevel)
griffinLogger.setLevel(logLevel)
- val sqlContext = spark.sqlContext
// register udf
- GriffinUDFAgent.register(sqlContext)
+ GriffinUDFAgent.register(spark)
}
}