Repository: incubator-s2graph Updated Branches: refs/heads/master 5706bbf1d -> 70a7c71a3
add operations not supported on sql Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f42885bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f42885bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f42885bc Branch: refs/heads/master Commit: f42885bc5fe315217bf40bbbef21557c06f52586 Parents: 33e3d26 Author: Chul Kang <[email protected]> Authored: Fri Jun 8 00:01:18 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Fri Jun 8 00:01:18 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/s2jobs/task/Process.scala | 64 +++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f42885bc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala index 6bc100f..c077f5b 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala @@ -20,6 +20,7 @@ package org.apache.s2graph.s2jobs.task import org.apache.spark.sql.{DataFrame, SparkSession} +import play.api.libs.json.Json /** * Process @@ -47,7 +48,68 @@ class SqlProcess(conf:TaskConf) extends Process(conf) { val sql = conf.options("sql") logger.debug(s"${LOG_PREFIX} sql : $sql") - ss.sql(sql) + postProcess(ss.sql(sql)) } + + /** + * extraOperations + * @param df + * @return + */ + private def postProcess(df: DataFrame): DataFrame = { + import org.apache.spark.sql.functions._ + + var resultDF = df + + // watermark + val timeColumn = conf.options.get(s"time.column") + logger.debug(s">> timeColumn: ${timeColumn}") + + val waterMarkDelayTime = conf.options.get(s"watermark.delay.time") + if (waterMarkDelayTime.isDefined){ + logger.debug(s">> waterMarkDelayTime : ${waterMarkDelayTime}") + if (timeColumn.isDefined) { + resultDF = resultDF.withWatermark(timeColumn.get, waterMarkDelayTime.get) + } else logger.warn("time.column does not exists.. cannot apply watermark") + } + + // drop duplication + val dropDuplicateColumns = conf.options.get("drop.duplicate.columns") + if (dropDuplicateColumns.isDefined) { + logger.debug(s">> dropDuplicates : ${dropDuplicateColumns}") + resultDF = resultDF.dropDuplicates(dropDuplicateColumns.get.split(",")) + } + + // groupBy + val groupedKeysOpt = conf.options.get(s"grouped.keys") + if (groupedKeysOpt.isDefined) { + var groupedKeys = groupedKeysOpt.get.split(",").map{ key => + col(key.trim) + }.toSeq + + val windowDurationOpt = conf.options.get(s"grouped.window.duration") + val slideDurationOpt = conf.options.get(s"grouped.slide.duration") + if (windowDurationOpt.isDefined && slideDurationOpt.isDefined){ + logger.debug(s">> using window operation : Duration ${windowDurationOpt}, slideDuration : ${slideDurationOpt}") + groupedKeys = groupedKeys ++ Seq(window(col(timeColumn.get), windowDurationOpt.get, slideDurationOpt.get)) + } + logger.debug(s">> groupedKeys: ${groupedKeys}") + + // aggregate options + val aggExprs = Json.parse(conf.options.getOrElse(s"grouped.dataset.agg", "[\"count(1)\"]")).as[Seq[String]].map(expr(_)) + logger.debug(s">> aggr : ${aggExprs}") + + val groupedDF = resultDF.groupBy(groupedKeys: _*) + + resultDF = if (aggExprs.size > 1) { + groupedDF.agg(aggExprs.head, aggExprs.tail: _*) + } else { + groupedDF.agg(aggExprs.head) + } + } + + resultDF + } + }
