Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 5706bbf1d -> 70a7c71a3


add operations not supported on sql


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/f42885bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/f42885bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/f42885bc

Branch: refs/heads/master
Commit: f42885bc5fe315217bf40bbbef21557c06f52586
Parents: 33e3d26
Author: Chul Kang <[email protected]>
Authored: Fri Jun 8 00:01:18 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Fri Jun 8 00:01:18 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/s2jobs/task/Process.scala    | 64 +++++++++++++++++++-
 1 file changed, 63 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f42885bc/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
index 6bc100f..c077f5b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Process.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.s2jobs.task
 
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import play.api.libs.json.Json
 
 /**
   * Process
@@ -47,7 +48,68 @@ class SqlProcess(conf:TaskConf) extends Process(conf) {
     val sql = conf.options("sql")
     logger.debug(s"${LOG_PREFIX} sql : $sql")
 
-    ss.sql(sql)
+    postProcess(ss.sql(sql))
   }
+
+  /**
+    * extraOperations
+    * @param df
+    * @return
+    */
+  private def postProcess(df: DataFrame): DataFrame = {
+    import org.apache.spark.sql.functions._
+
+    var resultDF = df
+
+    // watermark
+    val timeColumn = conf.options.get(s"time.column")
+    logger.debug(s">> timeColumn:  ${timeColumn}")
+
+    val waterMarkDelayTime = conf.options.get(s"watermark.delay.time")
+    if (waterMarkDelayTime.isDefined){
+      logger.debug(s">> waterMarkDelayTime : ${waterMarkDelayTime}")
+      if (timeColumn.isDefined) {
+        resultDF = resultDF.withWatermark(timeColumn.get, 
waterMarkDelayTime.get)
+      } else logger.warn("time.column does not exists.. cannot apply 
watermark")
+    }
+
+    // drop duplication
+    val dropDuplicateColumns = conf.options.get("drop.duplicate.columns")
+    if (dropDuplicateColumns.isDefined) {
+      logger.debug(s">> dropDuplicates : ${dropDuplicateColumns}")
+      resultDF = resultDF.dropDuplicates(dropDuplicateColumns.get.split(","))
+    }
+
+    // groupBy
+    val groupedKeysOpt = conf.options.get(s"grouped.keys")
+    if (groupedKeysOpt.isDefined) {
+      var groupedKeys = groupedKeysOpt.get.split(",").map{ key =>
+        col(key.trim)
+      }.toSeq
+
+      val windowDurationOpt = conf.options.get(s"grouped.window.duration")
+      val slideDurationOpt = conf.options.get(s"grouped.slide.duration")
+      if (windowDurationOpt.isDefined && slideDurationOpt.isDefined){
+        logger.debug(s">> using window operation : Duration 
${windowDurationOpt}, slideDuration : ${slideDurationOpt}")
+        groupedKeys = groupedKeys ++ Seq(window(col(timeColumn.get), 
windowDurationOpt.get, slideDurationOpt.get))
+      }
+      logger.debug(s">> groupedKeys: ${groupedKeys}")
+
+      // aggregate options
+      val aggExprs = Json.parse(conf.options.getOrElse(s"grouped.dataset.agg", 
"[\"count(1)\"]")).as[Seq[String]].map(expr(_))
+      logger.debug(s">> aggr : ${aggExprs}")
+
+      val groupedDF = resultDF.groupBy(groupedKeys: _*)
+
+      resultDF = if (aggExprs.size > 1) {
+        groupedDF.agg(aggExprs.head, aggExprs.tail: _*)
+      } else {
+        groupedDF.agg(aggExprs.head)
+      }
+    }
+
+    resultDF
+  }
+
 }
 

Reply via email to