add grok udf

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/26321aca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/26321aca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/26321aca

Branch: refs/heads/master
Commit: 26321aca24afbfbf0f44bd197d91d017e06e1ac0
Parents: d67adc5
Author: Chul Kang <[email protected]>
Authored: Thu Jun 21 14:33:30 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Thu Jun 21 14:33:30 2018 +0900

----------------------------------------------------------------------
 s2jobs/build.sbt                                |  1 +
 .../org/apache/s2graph/s2jobs/udfs/Grok.scala   | 21 ++++++++++
 .../s2graph/s2jobs/utils/GrokHelper.scala       | 44 ++++++++++++++++++++
 3 files changed, 66 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/26321aca/s2jobs/build.sbt
----------------------------------------------------------------------
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
index f647040..47ec835 100644
--- a/s2jobs/build.sbt
+++ b/s2jobs/build.sbt
@@ -38,6 +38,7 @@ libraryDependencies ++= Seq(
   "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
   "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion,
   "com.github.scopt" %% "scopt" % "3.7.0",
+  "io.thekraken" % "grok" % "0.1.5",
   "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test
 )
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/26321aca/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
new file mode 100644
index 0000000..6ed2f00
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
@@ -0,0 +1,21 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.utils.GrokHelper
+import org.apache.spark.sql.SparkSession
+import play.api.libs.json.Json
+
+class Grok extends Udf {
+  def register(ss: SparkSession, name:String, options:Map[String, String]) = {
+    // grok
+    val patternDir = options.getOrElse("patternDir", "/tmp")
+    val patternFiles = options.getOrElse("patternFiles", "").split(",").toSeq
+    val patterns = Json.parse(options.getOrElse("patterns", 
"{}")).asOpt[Map[String, String]].getOrElse(Map.empty)
+    val compilePattern = options("compilePattern")
+
+    patternFiles.foreach { patternFile =>
+      ss.sparkContext.addFile(s"${patternDir}/${patternFile}")
+    }
+    implicit val grok = GrokHelper.getGrok(name, patternFiles, patterns, 
compilePattern)
+    ss.udf.register(name, GrokHelper.grokMatch _)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/26321aca/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
new file mode 100644
index 0000000..01ecf65
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
@@ -0,0 +1,44 @@
+package org.apache.s2graph.s2jobs.utils
+
+import io.thekraken.grok.api.Grok
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.SparkFiles
+
+import scala.collection.mutable
+
+object GrokHelper extends Logger {
+  private val grokPool:mutable.Map[String, Grok] = mutable.Map.empty
+
+  def getGrok(name:String, patternFiles:Seq[String], patterns:Map[String, 
String], compilePattern:String):Grok = {
+    if (grokPool.get(name).isEmpty) {
+      println(s"Grok '$name' initialized..")
+      val grok = new Grok()
+      patternFiles.foreach { patternFile =>
+        val filePath = SparkFiles.get(patternFile)
+        println(s"[Grok][$name] add pattern file : $patternFile  ($filePath)")
+        grok.addPatternFromFile(filePath)
+      }
+      patterns.foreach { case (name, pattern) =>
+        println(s"[Grok][$name] add pattern : $name ($pattern)")
+        grok.addPattern(name, pattern)
+      }
+
+      grok.compile(compilePattern)
+      println(s"[Grok][$name] patterns: ${grok.getPatterns}")
+      grokPool.put(name, grok)
+    }
+
+    grokPool(name)
+  }
+
+  def grokMatch(text:String)(implicit grok:Grok):Option[Map[String, String]] = 
{
+    import scala.collection.JavaConverters._
+
+    val m = grok.`match`(text)
+    m.captures()
+    val rstMap = m.toMap.asScala.toMap
+      .filter(_._2 != null)
+      .map{ case (k, v) =>  k -> v.toString}
+    if (rstMap.isEmpty) None else Some(rstMap)
+  }
+}

Reply via email to