add grok udf
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/26321aca Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/26321aca Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/26321aca Branch: refs/heads/master Commit: 26321aca24afbfbf0f44bd197d91d017e06e1ac0 Parents: d67adc5 Author: Chul Kang <[email protected]> Authored: Thu Jun 21 14:33:30 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Thu Jun 21 14:33:30 2018 +0900 ---------------------------------------------------------------------- s2jobs/build.sbt | 1 + .../org/apache/s2graph/s2jobs/udfs/Grok.scala | 21 ++++++++++ .../s2graph/s2jobs/utils/GrokHelper.scala | 44 ++++++++++++++++++++ 3 files changed, 66 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/26321aca/s2jobs/build.sbt ---------------------------------------------------------------------- diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt index f647040..47ec835 100644 --- a/s2jobs/build.sbt +++ b/s2jobs/build.sbt @@ -38,6 +38,7 @@ libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion, "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion, "com.github.scopt" %% "scopt" % "3.7.0", + "io.thekraken" % "grok" % "0.1.5", "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test ) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/26321aca/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala new file mode 100644 index 0000000..6ed2f00 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala @@ -0,0 +1,21 @@ +package org.apache.s2graph.s2jobs.udfs + +import org.apache.s2graph.s2jobs.utils.GrokHelper +import org.apache.spark.sql.SparkSession +import play.api.libs.json.Json + +class Grok extends Udf { + def register(ss: SparkSession, name:String, options:Map[String, String]) = { + // grok + val patternDir = options.getOrElse("patternDir", "/tmp") + val patternFiles = options.getOrElse("patternFiles", "").split(",").toSeq + val patterns = Json.parse(options.getOrElse("patterns", "{}")).asOpt[Map[String, String]].getOrElse(Map.empty) + val compilePattern = options("compilePattern") + + patternFiles.foreach { patternFile => + ss.sparkContext.addFile(s"${patternDir}/${patternFile}") + } + implicit val grok = GrokHelper.getGrok(name, patternFiles, patterns, compilePattern) + ss.udf.register(name, GrokHelper.grokMatch _) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/26321aca/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala new file mode 100644 index 0000000..01ecf65 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala @@ -0,0 +1,44 @@ +package org.apache.s2graph.s2jobs.utils + +import io.thekraken.grok.api.Grok +import org.apache.s2graph.s2jobs.Logger +import org.apache.spark.SparkFiles + +import scala.collection.mutable + +object GrokHelper extends Logger { + private val grokPool:mutable.Map[String, Grok] = mutable.Map.empty + + def getGrok(name:String, patternFiles:Seq[String], patterns:Map[String, String], compilePattern:String):Grok = { + if (grokPool.get(name).isEmpty) { + println(s"Grok '$name' initialized..") + val grok = new Grok() + patternFiles.foreach { patternFile => + val filePath = SparkFiles.get(patternFile) + println(s"[Grok][$name] add pattern file : $patternFile ($filePath)") + grok.addPatternFromFile(filePath) + } + patterns.foreach { case (name, pattern) => + println(s"[Grok][$name] add pattern : $name ($pattern)") + grok.addPattern(name, pattern) + } + + grok.compile(compilePattern) + println(s"[Grok][$name] patterns: ${grok.getPatterns}") + grokPool.put(name, grok) + } + + grokPool(name) + } + + def grokMatch(text:String)(implicit grok:Grok):Option[Map[String, String]] = { + import scala.collection.JavaConverters._ + + val m = grok.`match`(text) + m.captures() + val rstMap = m.toMap.asScala.toMap + .filter(_._2 != null) + .map{ case (k, v) => k -> v.toString} + if (rstMap.isEmpty) None else Some(rstMap) + } +}
