add schema option for grok udf

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0ae2cb25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0ae2cb25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0ae2cb25

Branch: refs/heads/master
Commit: 0ae2cb2516264f37d07425bb7b59981aabd8600a
Parents: 26321ac
Author: Chul Kang <[email protected]>
Authored: Tue Jul 24 14:41:14 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Tue Jul 24 14:41:14 2018 +0900

----------------------------------------------------------------------
 project/Common.scala                             |  2 +-
 .../org/apache/s2graph/s2jobs/udfs/Grok.scala    | 19 +++++++++++++++++--
 .../apache/s2graph/s2jobs/utils/GrokHelper.scala | 15 +++++++++++++++
 3 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0ae2cb25/project/Common.scala
----------------------------------------------------------------------
diff --git a/project/Common.scala b/project/Common.scala
index 08552a8..af121fc 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -29,7 +29,7 @@ object Common {
   val hadoopVersion = "2.7.3"
   val tinkerpopVersion = "3.2.5"
 
-  val elastic4sVersion = "6.1.1"
+  val elastic4sVersion = "6.2.4"
 
   val KafkaVersion = "0.10.2.1"
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0ae2cb25/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
index 6ed2f00..ebcb41d 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
@@ -2,20 +2,35 @@ package org.apache.s2graph.s2jobs.udfs
 
 import org.apache.s2graph.s2jobs.utils.GrokHelper
 import org.apache.spark.sql.SparkSession
-import play.api.libs.json.Json
+import org.apache.spark.sql.types.{DataType, StructType}
+import play.api.libs.json.{JsValue, Json}
 
 class Grok extends Udf {
+  import org.apache.spark.sql.functions.udf
+
   def register(ss: SparkSession, name:String, options:Map[String, String]) = {
     // grok
     val patternDir = options.getOrElse("patternDir", "/tmp")
     val patternFiles = options.getOrElse("patternFiles", "").split(",").toSeq
     val patterns = Json.parse(options.getOrElse("patterns", 
"{}")).asOpt[Map[String, String]].getOrElse(Map.empty)
     val compilePattern = options("compilePattern")
+    val schemaOpt = options.get("schema")
 
     patternFiles.foreach { patternFile =>
       ss.sparkContext.addFile(s"${patternDir}/${patternFile}")
     }
+
     implicit val grok = GrokHelper.getGrok(name, patternFiles, patterns, 
compilePattern)
-    ss.udf.register(name, GrokHelper.grokMatch _)
+
+    val f = if(schemaOpt.isDefined) {
+      val schema = DataType.fromJson(schemaOpt.get)
+      implicit val keys:Array[String] = 
schema.asInstanceOf[StructType].fieldNames
+      udf(GrokHelper.grokMatchWithSchema _, schema)
+    } else {
+      udf(GrokHelper.grokMatch _)
+    }
+
+
+    ss.udf.register(name, f)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0ae2cb25/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
index 01ecf65..37485c8 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
@@ -3,6 +3,7 @@ package org.apache.s2graph.s2jobs.utils
 import io.thekraken.grok.api.Grok
 import org.apache.s2graph.s2jobs.Logger
 import org.apache.spark.SparkFiles
+import org.apache.spark.sql.Row
 
 import scala.collection.mutable
 
@@ -41,4 +42,18 @@ object GrokHelper extends Logger {
       .map{ case (k, v) =>  k -> v.toString}
     if (rstMap.isEmpty) None else Some(rstMap)
   }
+
+  def grokMatchWithSchema(text:String)(implicit grok:Grok, 
keys:Array[String]):Option[Row] = {
+    import scala.collection.JavaConverters._
+
+    val m = grok.`match`(text)
+    m.captures()
+
+    val rstMap = m.toMap.asScala.toMap
+    if (rstMap.isEmpty) None
+    else {
+      val l = keys.map { key => rstMap.getOrElse(key, null)}
+      Some(Row.fromSeq(l))
+    }
+  }
 }

Reply via email to