batch pass
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/ada8a0d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/ada8a0d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/ada8a0d4 Branch: refs/heads/griffin-0.2.0-incubating-rc4 Commit: ada8a0d4b02a05793b220998cddeb1de5179d80d Parents: 07fcb29 Author: Lionel Liu <[email protected]> Authored: Mon Apr 16 22:07:09 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Mon Apr 16 22:07:09 2018 +0800 ---------------------------------------------------------------------- measure/pom.xml | 2 +- .../data/source/cache/DataSourceCache.scala | 2 +- .../data/source/cache/JsonDataSourceCache.scala | 4 +- .../data/source/cache/OrcDataSourceCache.scala | 4 +- .../source/cache/ParquetDataSourceCache.scala | 4 +- .../griffin/measure/persist/MultiPersists.scala | 2 +- .../griffin/measure/persist/Persist.scala | 2 +- .../measure/process/BatchDqProcess.scala | 27 +- .../measure/process/StreamingDqProcess.scala | 26 +- .../process/engine/DataFrameOprEngine.scala | 10 +- .../measure/process/engine/DqEngine.scala | 2 +- .../measure/process/engine/DqEngines.scala | 2 +- .../measure/process/engine/SparkDqEngine.scala | 9 +- .../measure/process/engine/SparkSqlEngine.scala | 2 +- .../measure/rule/adaptor/RuleAdaptorGroup.scala | 17 +- .../griffin/measure/util/MessageUtil.java | 320 +++++++++---------- 16 files changed, 233 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/pom.xml ---------------------------------------------------------------------- diff --git a/measure/pom.xml b/measure/pom.xml index 42fc967..2dffd35 100644 --- a/measure/pom.xml +++ b/measure/pom.xml @@ -117,7 +117,7 @@ under the License. <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_${scala.binary.version}</artifactId> - <version>2.0.1</version> + <version>4.0.0</version> </dependency> <!--csv--> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala index 2412130..f70bd11 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala @@ -98,7 +98,7 @@ trait DataSourceCache extends DataCacheable with WithFanIn[Long] with Loggable w val defOldCacheIndex = 0L - protected def writeDataFrame(dfw: DataFrameWriter, path: String): Unit + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame def init(): Unit = {} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala index e284d47..2fa5316 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.data.source.cache -import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext} +import org.apache.spark.sql._ case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], dsName: String, index: Int @@ -28,7 +28,7 @@ case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], // sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); } - def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = { + def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { println(s"write path: ${path}") dfw.json(path) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala index 7b92bef..5bf2500 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.data.source.cache -import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext} +import org.apache.spark.sql._ case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], dsName: String, index: Int @@ -28,7 +28,7 @@ case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], // sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); } - def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = { + def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { println(s"write path: ${path}") dfw.orc(path) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala index 89cd0b7..f39d832 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.data.source.cache -import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext} +import org.apache.spark.sql._ case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], dsName: String, index: Int @@ -28,7 +28,7 @@ case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") } - def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = { + def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { println(s"write path: ${path}") dfw.parquet(path) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala index bed28fd..4b30cba 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala @@ -21,7 +21,7 @@ package org.apache.griffin.measure.persist import org.apache.griffin.measure.result._ import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Dataset} import scala.util.Try http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala index 361fad7..300f2c5 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala @@ -21,7 +21,7 @@ package org.apache.griffin.measure.persist import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.result._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Dataset} import scala.util.Try http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala index 2770de8..04b608a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala @@ -30,9 +30,8 @@ import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, import org.apache.griffin.measure.rule.adaptor._ import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.rule.udf._ -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.SparkConf import scala.util.Try @@ -46,24 +45,31 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { val dataSourceNames = userParam.dataSources.map(_.name) val baselineDsName = userParam.baselineDsName - var sparkContext: SparkContext = _ +// var sparkContext: SparkContext = _ var sqlContext: SQLContext = _ + var sparkSession: SparkSession = _ + def retriable: Boolean = false def init: Try[_] = Try { val conf = new SparkConf().setAppName(metricName) conf.setAll(sparkParam.config) - sparkContext = new SparkContext(conf) - sparkContext.setLogLevel(sparkParam.logLevel) - sqlContext = new HiveContext(sparkContext) + conf.set("spark.sql.crossJoin.enabled", "true") + sparkSession = if (conf.contains("hive.metastore.uris")) { + SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() + } else { + SparkSession.builder().config(conf).getOrCreate() + } + sparkSession.sparkContext.setLogLevel(sparkParam.logLevel) + sqlContext = sparkSession.sqlContext // register udf GriffinUdfs.register(sqlContext) GriffinUdafs.register(sqlContext) // init adaptors - RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName) + RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName) } def run: Try[_] = Try { @@ -78,7 +84,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { val persist: Persist = persistFactory.getPersists(appTime) // persist start id - val applicationId = sparkContext.applicationId + val applicationId = sparkSession.sparkContext.applicationId persist.start(applicationId) // get dq engines @@ -146,7 +152,8 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { DataFrameCaches.uncacheGlobalDataFrames() DataFrameCaches.clearGlobalTrashDataFrames() - sparkContext.stop + sparkSession.close() + sparkSession.stop() } // private def cleanData(t: Long): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala index b2af46a..b7ec449 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala @@ -29,7 +29,7 @@ import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup import org.apache.griffin.measure.rule.udf._ import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} @@ -46,17 +46,24 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { val dataSourceNames = userParam.dataSources.map(_.name) val baselineDsName = userParam.baselineDsName - var sparkContext: SparkContext = _ +// var sparkContext: SparkContext = _ var sqlContext: SQLContext = _ + var sparkSession: SparkSession = _ + def retriable: Boolean = true def init: Try[_] = Try { val conf = new SparkConf().setAppName(metricName) conf.setAll(sparkParam.config) - sparkContext = new SparkContext(conf) - sparkContext.setLogLevel(sparkParam.logLevel) - sqlContext = new HiveContext(sparkContext) + conf.set("spark.sql.crossJoin.enabled", "true") + sparkSession = if (conf.contains("hive.metastore.uris")) { + SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() + } else { + SparkSession.builder().config(conf).getOrCreate() + } + sparkSession.sparkContext.setLogLevel(sparkParam.logLevel) + sqlContext = sparkSession.sqlContext // clear checkpoint directory clearCpDir @@ -71,7 +78,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { // init adaptors val dataSourceNames = userParam.dataSources.map(_.name) - RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName) + RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName) } def run: Try[_] = Try { @@ -94,7 +101,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { val persist: Persist = persistFactory.getPersists(appTime) // persist start id - val applicationId = sparkContext.applicationId + val applicationId = sparkSession.sparkContext.applicationId persist.start(applicationId) // get dq engines @@ -149,7 +156,8 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { DataFrameCaches.uncacheGlobalDataFrames() DataFrameCaches.clearGlobalTrashDataFrames() - sparkContext.stop + sparkSession.close() + sparkSession.stop() InfoCacheInstance.close } @@ -159,7 +167,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { case Some(interval) => Milliseconds(interval) case _ => throw new Exception("invalid batch interval") } - val ssc = new StreamingContext(sparkContext, batchInterval) + val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval) ssc.checkpoint(sparkParam.cpDir) ssc http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala index 59b765e..600da45 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala @@ -38,6 +38,8 @@ import org.apache.griffin.measure.utils.ParamUtil._ import scala.util.Try +import org.apache.spark.sql._ + case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine { def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = { @@ -85,9 +87,11 @@ object DataFrameOprs { val dfName = details.getOrElse(_dfName, "").toString val colNameOpt = details.get(_colName).map(_.toString) - val df = sqlContext.table(s"`${dfName}`") + implicit val encoder = Encoders.STRING + + val df: DataFrame = sqlContext.table(s"`${dfName}`") val rdd = colNameOpt match { - case Some(colName: String) => df.map(_.getAs[String](colName)) + case Some(colName: String) => df.map(r => r.getAs[String](colName)) case _ => df.map(_.getAs[String](0)) } sqlContext.read.json(rdd) // slow process @@ -116,6 +120,8 @@ object DataFrameOprs { } } + implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.bean(classOf[AccuracyResult])) + val df = sqlContext.table(s"`${dfName}`") val results = df.flatMap { row => http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala index 3d77458..a2de69d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala @@ -26,7 +26,7 @@ import org.apache.griffin.measure.process.ProcessType import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.plan.{TimeInfo, _} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{DataFrame, Dataset, Row} trait DqEngine extends Loggable with Serializable { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala index 6b9a215..a3ef7dc 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala @@ -31,7 +31,7 @@ import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.plan.{DsUpdate, _} import org.apache.griffin.measure.utils.JsonUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{DataFrame, Dataset, Row} //import scala.concurrent._ //import scala.concurrent.duration.Duration http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala index 382e302..4ba185f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala @@ -24,7 +24,7 @@ import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.utils.JsonUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql._ import org.apache.griffin.measure.utils.ParamUtil._ trait SparkDqEngine extends DqEngine { @@ -115,10 +115,11 @@ trait SparkDqEngine extends DqEngine { } def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = { - getRecordDataFrame(recordExport).map(_.toJSON) + getRecordDataFrame(recordExport).map(_.toJSON.rdd) } def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = { + implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING) val RecordExport(_, _, _, originDFOpt, defTmst, procType) = recordExport getRecordDataFrame(recordExport) match { case Some(stepDf) => { @@ -149,7 +150,7 @@ trait SparkDqEngine extends DqEngine { } } else None } - (Some(records.groupByKey), emptyTmsts) + (Some(records.rdd.groupByKey), emptyTmsts) } else (None, emptyTmsts) } case _ => { @@ -163,7 +164,7 @@ trait SparkDqEngine extends DqEngine { case e: Throwable => None } } - (Some(records.groupByKey), Set[Long]()) + (Some(records.rdd.groupByKey), Set[Long]()) } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala index 438595b..e4ecb49 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala @@ -29,7 +29,7 @@ import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.utils.JsonUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, GroupedData, SQLContext} +import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.StreamingContext case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala index 30a356c..5b5f419 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala @@ -24,7 +24,7 @@ import org.apache.griffin.measure.process.ProcessType import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange} import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.plan._ -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{Encoders, SQLContext, SparkSession} import scala.collection.mutable.{Map => MutableMap} @@ -46,14 +46,23 @@ object RuleAdaptorGroup { functionNames = funcNames } - def init(sqlContext: SQLContext, dsNames: Seq[String], blDsName: String): Unit = { - val functions = sqlContext.sql("show functions") - functionNames = functions.map(_.getString(0)).collect.toSeq + def init(sparkSession: SparkSession, dsNames: Seq[String], blDsName: String): Unit = { + implicit val encoder = Encoders.STRING + val functions = sparkSession.catalog.listFunctions + functionNames = functions.map(_.name).collect.toSeq dataSourceNames = dsNames baselineDsName = blDsName } +// def init(sqlContext: SQLContext, dsNames: Seq[String], blDsName: String): Unit = { + // val functions = sqlContext.sql("show functions") + // functionNames = functions.map(_.getString(0)).collect.toSeq + // dataSourceNames = dsNames + // + // baselineDsName = blDsName + // } + private def getDslType(param: Map[String, Any], defDslType: DslType) = { DslType(param.getOrElse(_dslType, defDslType.desc).toString) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java b/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java index 6fba5ff..4f15b62 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java +++ b/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java @@ -16,163 +16,163 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package org.apache.griffin.measure.util; - -import java.io.IOException; - -import org.apache.commons.lang.StringUtils; -import java.util.ArrayList; -import java.io.*; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.*; -import java.util.*; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -import org.apache.griffin.measure.config.params.env.SMSParam; -import org.json.JSONException; -import org.json.JSONObject; - -/** - * Created by xiaoqiu.duan on 2017/9/11. - */ -public class MessageUtil { - - - public static String sendSMSCode(String teamPhone, String content, SMSParam smsParam){ - String url=smsParam.host(); - String SYS_ID=smsParam.id(); - String KEY=smsParam.key(); - String sendContext="["+smsParam.uuid()+"]: "+content ; - System.out.println(" sendContext: "+sendContext); - Long timestamp=new Date().getTime()+20610; - System.out.println(" timestamp: "+timestamp); - String[] tels=teamPhone.split(",") ; - String uuid = UUID.randomUUID().toString().replaceAll("-", ""); - Map<String,Object> param=new HashMap<String, Object>(); - param.put("apportionment","JR_DATA_ALARM"); - param.put("event",0); - param.put("eventTime",0); - param.put("isMasked",false); - param.put("originator","ETC"); - param.put("reqId",uuid); - param.put("schemaKey",""); - param.put("sysId",SYS_ID); - param.put("template",sendContext); - param.put("templateId",""); - param.put("timestamp",timestamp); - param.put("token",Md5Util.md5(SYS_ID+timestamp+KEY)); - param.put("typeCode","JR_DATA_ALARM"); - System.out.println("params: "+param); - List<Map<String, Object>> bodys = new ArrayList<Map<String, Object>>(); - for (int i=0;i<tels.length;i++) { - Map<String, Object> body = new HashMap<String, Object>(); - body.put("phoneNo", tels[i]); - body.put("params", ""); - body.put("userId", 0); - bodys.add(body); - } - System.out.println("bodys: "+bodys); - JSONObject jsonParam = new JSONObject(); - try { - jsonParam.put("params",param); - jsonParam.put("bodys",bodys); - System.out.println("jsonParam: "+jsonParam); - System.out.println("jsonParam: "+jsonParam.toString()); - } catch (JSONException e) { - e.printStackTrace(); - } - URL u=null; - int smsnum=0; - HttpURLConnection connection=null; - try{ - String result= postRequestUrl(url, jsonParam.toString(), "utf-8"); - return "send success"; - }catch(Exception e){ - e.printStackTrace(); - return null; - } - - } - - - public static String postRequestUrl(String url, String param,String encode) { - OutputStreamWriter out = null; - BufferedReader reader = null; - String response=""; - try { - URL httpUrl = null; - httpUrl = new URL(url); - HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection(); - conn.setRequestMethod("POST"); - conn.setRequestProperty("Content-Type", "application/json");//x-www-form-urlencoded - conn.setRequestProperty("connection", "keep-alive"); - conn.setUseCaches(false); - conn.setInstanceFollowRedirects(true); - conn.setDoOutput(true); - conn.setDoInput(true); - conn.connect(); - //POST - out = new OutputStreamWriter( - conn.getOutputStream()); - out.write(param); - - out.flush(); - - System.out.println("send POST "+conn.getResponseCode()); - reader = new BufferedReader(new InputStreamReader( - conn.getInputStream())); - - String lines; - while ((lines = reader.readLine()) != null) { - lines = new String(lines.getBytes(), "utf-8"); - response+=lines; - System.out.println("lines: "+lines); - } - reader.close(); - conn.disconnect(); - - //log.info(response.toString()); - } catch (Exception e) { - System.out.println("send POST errorï¼"+e); - e.printStackTrace(); - } - finally{ - try{ - if(out!=null){ - out.close(); - } - if(reader!=null){ - reader.close(); - } - } - catch(IOException ex){ - ex.printStackTrace(); - } - } - - return response; - } - - - - private static String readBufferedContent(BufferedReader bufferedReader) { - if (bufferedReader == null) - return null; - StringBuffer result = new StringBuffer(); - String line = null; - try { - while (StringUtils.isNotBlank((line = bufferedReader.readLine()))) { - result.append(line+"\r\n"); - } - } catch (IOException e) { - e.printStackTrace(); - return null; - } - return result.toString(); - } - -} +//package org.apache.griffin.measure.util; +// +//import java.io.IOException; +// +//import org.apache.commons.lang.StringUtils; +//import java.util.ArrayList; +//import java.io.*; +//import java.io.BufferedReader; +//import java.io.InputStreamReader; +//import java.net.*; +//import java.util.*; +//import java.util.HashMap; +//import java.util.List; +//import java.util.Map; +// +// +//import org.apache.griffin.measure.config.params.env.SMSParam; +//import org.json.JSONException; +//import org.json.JSONObject; +// +///** +// * Created by xiaoqiu.duan on 2017/9/11. +// */ +//public class MessageUtil { +// +// +// public static String sendSMSCode(String teamPhone, String content, SMSParam smsParam){ +// String url=smsParam.host(); +// String SYS_ID=smsParam.id(); +// String KEY=smsParam.key(); +// String sendContext="["+smsParam.uuid()+"]: "+content ; +// System.out.println(" sendContext: "+sendContext); +// Long timestamp=new Date().getTime()+20610; +// System.out.println(" timestamp: "+timestamp); +// String[] tels=teamPhone.split(",") ; +// String uuid = UUID.randomUUID().toString().replaceAll("-", ""); +// Map<String,Object> param=new HashMap<String, Object>(); +// param.put("apportionment","JR_DATA_ALARM"); +// param.put("event",0); +// param.put("eventTime",0); +// param.put("isMasked",false); +// param.put("originator","ETC"); +// param.put("reqId",uuid); +// param.put("schemaKey",""); +// param.put("sysId",SYS_ID); +// param.put("template",sendContext); +// param.put("templateId",""); +// param.put("timestamp",timestamp); +// param.put("token",Md5Util.md5(SYS_ID+timestamp+KEY)); +// param.put("typeCode","JR_DATA_ALARM"); +// System.out.println("params: "+param); +// List<Map<String, Object>> bodys = new ArrayList<Map<String, Object>>(); +// for (int i=0;i<tels.length;i++) { +// Map<String, Object> body = new HashMap<String, Object>(); +// body.put("phoneNo", tels[i]); +// body.put("params", ""); +// body.put("userId", 0); +// bodys.add(body); +// } +// System.out.println("bodys: "+bodys); +// JSONObject jsonParam = new JSONObject(); +// try { +// jsonParam.put("params",param); +// jsonParam.put("bodys",bodys); +// System.out.println("jsonParam: "+jsonParam); +// System.out.println("jsonParam: "+jsonParam.toString()); +// } catch (JSONException e) { +// e.printStackTrace(); +// } +// URL u=null; +// int smsnum=0; +// HttpURLConnection connection=null; +// try{ +// String result= postRequestUrl(url, jsonParam.toString(), "utf-8"); +// return "send success"; +// }catch(Exception e){ +// e.printStackTrace(); +// return null; +// } +// +// } +// +// +// public static String postRequestUrl(String url, String param,String encode) { +// OutputStreamWriter out = null; +// BufferedReader reader = null; +// String response=""; +// try { +// URL httpUrl = null; +// httpUrl = new URL(url); +// HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection(); +// conn.setRequestMethod("POST"); +// conn.setRequestProperty("Content-Type", "application/json");//x-www-form-urlencoded +// conn.setRequestProperty("connection", "keep-alive"); +// conn.setUseCaches(false); +// conn.setInstanceFollowRedirects(true); +// conn.setDoOutput(true); +// conn.setDoInput(true); +// conn.connect(); +// //POST +// out = new OutputStreamWriter( +// conn.getOutputStream()); +// out.write(param); +// +// out.flush(); +// +// System.out.println("send POST "+conn.getResponseCode()); +// reader = new BufferedReader(new InputStreamReader( +// conn.getInputStream())); +// +// String lines; +// while ((lines = reader.readLine()) != null) { +// lines = new String(lines.getBytes(), "utf-8"); +// response+=lines; +// System.out.println("lines: "+lines); +// } +// reader.close(); +// conn.disconnect(); +// +// //log.info(response.toString()); +// } catch (Exception e) { +// System.out.println("send POST errorï¼"+e); +// e.printStackTrace(); +// } +// finally{ +// try{ +// if(out!=null){ +// out.close(); +// } +// if(reader!=null){ +// reader.close(); +// } +// } +// catch(IOException ex){ +// ex.printStackTrace(); +// } +// } +// +// return response; +// } +// +// +// +// private static String readBufferedContent(BufferedReader bufferedReader) { +// if (bufferedReader == null) +// return null; +// StringBuffer result = new StringBuffer(); +// String line = null; +// try { +// while (StringUtils.isNotBlank((line = bufferedReader.readLine()))) { +// result.append(line+"\r\n"); +// } +// } catch (IOException e) { +// e.printStackTrace(); +// return null; +// } +// return result.toString(); +// } +// +//}
