add schema option for grok udf
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0ae2cb25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0ae2cb25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0ae2cb25 Branch: refs/heads/master Commit: 0ae2cb2516264f37d07425bb7b59981aabd8600a Parents: 26321ac Author: Chul Kang <[email protected]> Authored: Tue Jul 24 14:41:14 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Tue Jul 24 14:41:14 2018 +0900 ---------------------------------------------------------------------- project/Common.scala | 2 +- .../org/apache/s2graph/s2jobs/udfs/Grok.scala | 19 +++++++++++++++++-- .../apache/s2graph/s2jobs/utils/GrokHelper.scala | 15 +++++++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0ae2cb25/project/Common.scala ---------------------------------------------------------------------- diff --git a/project/Common.scala b/project/Common.scala index 08552a8..af121fc 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -29,7 +29,7 @@ object Common { val hadoopVersion = "2.7.3" val tinkerpopVersion = "3.2.5" - val elastic4sVersion = "6.1.1" + val elastic4sVersion = "6.2.4" val KafkaVersion = "0.10.2.1" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0ae2cb25/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala index 6ed2f00..ebcb41d 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala @@ -2,20 +2,35 @@ package org.apache.s2graph.s2jobs.udfs import org.apache.s2graph.s2jobs.utils.GrokHelper import org.apache.spark.sql.SparkSession -import play.api.libs.json.Json +import org.apache.spark.sql.types.{DataType, StructType} +import play.api.libs.json.{JsValue, Json} class Grok extends Udf { + import org.apache.spark.sql.functions.udf + def register(ss: SparkSession, name:String, options:Map[String, String]) = { // grok val patternDir = options.getOrElse("patternDir", "/tmp") val patternFiles = options.getOrElse("patternFiles", "").split(",").toSeq val patterns = Json.parse(options.getOrElse("patterns", "{}")).asOpt[Map[String, String]].getOrElse(Map.empty) val compilePattern = options("compilePattern") + val schemaOpt = options.get("schema") patternFiles.foreach { patternFile => ss.sparkContext.addFile(s"${patternDir}/${patternFile}") } + implicit val grok = GrokHelper.getGrok(name, patternFiles, patterns, compilePattern) - ss.udf.register(name, GrokHelper.grokMatch _) + + val f = if(schemaOpt.isDefined) { + val schema = DataType.fromJson(schemaOpt.get) + implicit val keys:Array[String] = schema.asInstanceOf[StructType].fieldNames + udf(GrokHelper.grokMatchWithSchema _, schema) + } else { + udf(GrokHelper.grokMatch _) + } + + + ss.udf.register(name, f) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0ae2cb25/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala index 01ecf65..37485c8 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala @@ -3,6 +3,7 @@ package org.apache.s2graph.s2jobs.utils import io.thekraken.grok.api.Grok import org.apache.s2graph.s2jobs.Logger import org.apache.spark.SparkFiles +import org.apache.spark.sql.Row import scala.collection.mutable @@ -41,4 +42,18 @@ object GrokHelper extends Logger { .map{ case (k, v) => k -> v.toString} if (rstMap.isEmpty) None else Some(rstMap) } + + def grokMatchWithSchema(text:String)(implicit grok:Grok, keys:Array[String]):Option[Row] = { + import scala.collection.JavaConverters._ + + val m = grok.`match`(text) + m.captures() + + val rstMap = m.toMap.asScala.toMap + if (rstMap.isEmpty) None + else { + val l = keys.map { key => rstMap.getOrElse(key, null)} + Some(Row.fromSeq(l)) + } + } }
