METRON-1707 Port Profiler to Spark (nickwallen) closes apache/metron#1150
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3bfbf018 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3bfbf018 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3bfbf018 Branch: refs/heads/master Commit: 3bfbf018a9c3e1c74dc934901446b5111a0ada03 Parents: 6fb50a1 Author: nickwallen <n...@nickallen.org> Authored: Thu Aug 23 17:58:18 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Thu Aug 23 17:58:18 2018 -0400 ---------------------------------------------------------------------- dependencies_with_url.csv | 64 +++++- .../profiler/DefaultMessageDistributor.java | 1 - .../apache/metron/profiler/MessageRoute.java | 10 +- metron-analytics/metron-profiler-spark/pom.xml | 195 +++++++++++++++++++ .../metron/profiler/spark/BatchProfiler.java | 102 ++++++++++ .../profiler/spark/BatchProfilerConfig.java | 190 ++++++++++++++++++ .../spark/ProfileMeasurementAdapter.java | 132 +++++++++++++ .../spark/function/GroupByPeriodFunction.java | 60 ++++++ .../spark/function/HBaseWriterFunction.java | 171 ++++++++++++++++ .../spark/function/MessageRouterFunction.java | 113 +++++++++++ .../spark/function/ProfileBuilderFunction.java | 107 ++++++++++ .../profiler/spark/function/TaskUtils.java | 41 ++++ .../spark/BatchProfilerIntegrationTest.java | 111 +++++++++++ .../spark/function/HBaseWriterFunctionTest.java | 176 +++++++++++++++++ .../function/MessageRouterFunctionTest.java | 114 +++++++++++ .../function/ProfileBuilderFunctionTest.java | 98 ++++++++++ .../src/test/resources/log4j.properties | 31 +++ .../src/test/resources/telemetry.json | 100 ++++++++++ metron-analytics/pom.xml | 1 + .../configuration/profiler/ProfileResult.java | 4 + .../profiler/ProfileResultExpressions.java | 4 + pom.xml | 1 + 22 files changed, 1822 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/dependencies_with_url.csv ---------------------------------------------------------------------- diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index 6ac1f23..6b4385b 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -33,13 +33,18 @@ com.maxmind.geoip2:geoip2:jar:2.8.0:compile,Apache v2,https://github.com/maxmind com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile,CDDL,http://jaxb.java.net/ com.sun.xml.bind:jaxb-impl:jar:2.2.5-2:compile,CDDL,http://jaxb.java.net/ com.twitter:jsr166e:jar:1.1.0:compile,CC0 1.0 Universal,http://github.com/twitter/jsr166e +com.twitter:chill-java:jar:0.8.4:compile,ASLv2,https://github.com/twitter/chill +com.twitter:chill_2.11:jar:0.8.4:compile,ASLv2,https://github.com/twitter/chill it.unimi.dsi:fastutil:jar:7.0.6:compile,ASLv2,https://github.com/vigna/fastutil javassist:javassist:jar:3.12.1.GA:compile,Apache v2,http://www.javassist.org/ javax.activation:activation:jar:1.1:compile,Common Development and Distribution License (CDDL) v1.0,http://java.sun.com/products/javabeans/jaf/index.jsp +javax.activation:activation:jar:1.1.1:compile,Common Development and Distribution License (CDDL) v1.0,http://java.sun.com/products/javabeans/jaf/index.jsp javax.annotation:jsr250-api:jar:1.0:compile,COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0,http://jcp.org/aboutJava/communityprocess/final/jsr250/index.html javax.annotation:javax.annotation-api:jar:1.3.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/ +javax.annotation:javax.annotation-api:jar:1.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/ javax.mail:mail:jar:1.4:compile,Common Development and Distribution License (CDDL) v1.0,https://glassfish.dev.java.net/javaee5/mail/ javax.servlet:javax.servlet-api:jar:3.1.0:compile,CDDL,http://servlet-spec.java.net +javax.ws.rs:javax.ws.rs-api:jar:2.0.1:compile,CDDL 1.1,https://github.com/jax-rs/api javax.xml.bind:jaxb-api:jar:2.2.11:compile,CDDL,http://jaxb.java.net/ javax.xml.bind:jaxb-api:jar:2.2.2:compile,CDDL,https://jaxb.dev.java.net/ javax.xml.bind:jaxb-api:jar:2.3.0:compile,CDDL,https://jaxb.dev.java.net/ @@ -47,25 +52,41 @@ javax.xml.stream:stax-api:jar:1.0-2:compile,COMMON DEVELOPMENT AND DISTRIBUTION jline:jline:jar:0.9.94:compile,BSD,http://jline.sourceforge.net junit:junit:jar:4.12:compile,Eclipse Public License 1.0,http://junit.org junit:junit:jar:4.4:compile,Common Public License Version 1.0,http://junit.org +net.razorvine:pyrolite:jar:4.13:compile,MIT,https://github.com/irmen/Pyrolite net.sf.jopt-simple:jopt-simple:jar:3.2:compile,The MIT License,http://jopt-simple.sourceforge.net net.sf.jopt-simple:jopt-simple:jar:4.9:compile,The MIT License,http://jopt-simple.sourceforge.net net.sf.saxon:Saxon-HE:jar:9.5.1-5:compile,Mozilla Public License Version 2.0,http://www.saxonica.com/ org.abego.treelayout:org.abego.treelayout.core:jar:1.0.1:compile,BSD 3-Clause "New" or "Revised" License (BSD-3-Clause),http://code.google.com/p/treelayout/ org.adrianwalker:multiline-string:jar:0.1.2:compile,Common Public License Version 1.0,https://github.com/benelog/multiline org.antlr:antlr4-runtime:jar:4.5:compile,BSD 3-Clause License,http://www.antlr.org +org.bouncycastle:bcprov-jdk15on:jar:1.52:compile,MIT,https://www.bouncycastle.org/license.html org.clojure:clojure:jar:1.6.0:compile,Eclipse Public License 1.0,http://clojure.org/ org.clojure:clojure:jar:1.7.0:compile,Eclipse Public License 1.0,http://clojure.org/ org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org org.codehaus.jackson:jackson-jaxrs:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org org.codehaus.jackson:jackson-xc:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org org.codehaus.jackson:jackson-xc:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org +org.codehaus.janino:commons-compiler:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino +org.codehaus.janino:janino:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino org.codehaus.woodstox:stax2-api:jar:3.1.4:compile,The BSD License,http://wiki.fasterxml.com/WoodstoxStax2 +org.json4s:json4s-ast_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s +org.json4s:json4s-core_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s +org.json4s:json4s-jackson_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s org.jruby.jcodings:jcodings:jar:1.0.8:compile,MIT License,https://github.com/jruby/jcodings org.jruby.joni:joni:jar:2.1.2:compile,MIT License,https://github.com/jruby/joni +org.lz4:lz4-java:jar:1.4.0:compile,ASLv2,https://github.com/lz4/lz4-java org.mitre.taxii:taxii:jar:1.1.0.1:compile,The BSD 3-Clause License,https://github.com/TAXIIProject/java-taxii org.mitre:stix:jar:1.2.0.2:compile,The BSD 3-Clause License,https://github.com/STIXProject/java-stix org.mockito:mockito-core:jar:1.10.19:compile,The MIT License,http://www.mockito.org +org.roaringbitmap:RoaringBitmap:jar:0.5.11:compile,ASLv2,https://github.com/RoaringBitmap/RoaringBitmap org.scala-lang:scala-library:jar:2.10.6:compile,BSD-like,http://www.scala-lang.org/ +org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:compile,BSD-like,http://www.scala-lang.org/ +org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile,BSD-like,http://www.scala-lang.org/ +org.scala-lang:scala-compiler:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/ +org.scala-lang:scala-library:jar:2.11.8:compile,BSD-like,http://www.scala-lang.org/ +org.scala-lang:scala-reflect:jar:2.11.8:compile,BSD-like,http://www.scala-lang.org/ +org.scala-lang:scalap:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/ +oro:oro:jar:2.0.8:compile,ASLv2,http://attic.apache.org/projects/jakarta-oro.html xmlenc:xmlenc:jar:0.52:compile,The BSD License,http://xmlenc.sourceforge.net asm:asm:jar:3.1:compile,BSD,http://asm.ow2.org/ com.sun.jersey.contribs:jersey-guice:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/ @@ -103,7 +124,10 @@ org.slf4j:slf4j-log4j12:jar:1.7.5:compile,MIT,http://www.slf4j.org org.slf4j:slf4j-log4j12:jar:1.7.7:compile,MIT,http://www.slf4j.org org.slf4j:slf4j-simple:jar:1.7.7:compile,MIT,http://www.slf4j.org org.slf4j:jcl-over-slf4j:jar:1.7.7:compile,MIT,http://www.slf4j.org +org.slf4j:jcl-over-slf4j:jar:1.7.16:compile,MIT,http://www.slf4j.org org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org +org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org +org.slf4j:jul-to-slf4j:jar:1.7.16:compile,MIT,http://www.slf4j.org org.slf4j:jul-to-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org org.slf4j:jul-to-slf4j:jar:1.7.25:compile,MIT,http://www.slf4j.org aopalliance:aopalliance:jar:1.0:compile,Public Domain,http://aopalliance.sourceforge.net @@ -113,7 +137,9 @@ com.github.tony19:named-regexp:jar:0.2.3:compile,Apache License, Version 2.0, com.google.code.findbugs:jsr305:jar:1.3.9:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/ com.google.code.findbugs:jsr305:jar:3.0.0:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/ com.google.code.findbugs:annotations:jar:2.0.1:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/ -com.carrotsearch:hppc:jar:0.7.1:compile,ASLv2, +com.carrotsearch:hppc:jar:0.7.1:compile,ASLv2,https://github.com/carrotsearch/hppc +com.carrotsearch:hppc:jar:0.7.2:compile,ASLv2,https://github.com/carrotsearch/hppc +com.clearspring.analytics:stream:jar:2.7.0:compile,ASLv2,https://github.com/addthis/stream-lib com.clearspring.analytics:stream:jar:2.9.5:compile,ASLv2,https://github.com/addthis/stream-lib com.codahale.metrics:metrics-core:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics com.codahale.metrics:metrics-graphite:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics @@ -132,6 +158,7 @@ com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile,ASLv2,https://github.c com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-core com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://github.com/FasterXML/jackson +com.fasterxml.jackson.core:jackson-databind:jar:2.6.7.1:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson @@ -147,6 +174,8 @@ com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.9.5:compile,ASLv2,htt com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8 com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8 com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8 +com.fasterxml.jackson.module:jackson-module-paranamer:jar:2.7.9:compile,ASLv2,https://github.com/FasterXML/jackson-modules-base +com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.6.7.1:compile,ASLv2,https://github.com/FasterXML/jackson-module-scala com.fasterxml:classmate:jar:1.3.1:compile,ASLv2,http://github.com/cowtowncoder/java-classmate com.fasterxml:classmate:jar:1.3.4:compile,ASLv2,http://github.com/cowtowncoder/java-classmate com.google.code.gson:gson:jar:2.2.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/ @@ -164,10 +193,14 @@ com.lmax:disruptor:jar:3.3.2:compile,The Apache Software License, Version 2.0,ht com.googlecode.json-simple:json-simple:jar:1.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/json-simple/ com.googlecode.json-simple:json-simple:jar:1.1.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/json-simple/ com.jamesmurty.utils:java-xmlbuilder:jar:0.4:compile,Apache License, Version 2.0,http://code.google.com/p/java-xmlbuilder/ +com.jamesmurty.utils:java-xmlbuilder:jar:1.1:compile,Apache License, Version 2.0,http://code.google.com/p/java-xmlbuilder/ com.ning:compress-lzf:jar:1.0.2:compile,Apache License 2.0,http://github.com/ning/compress +com.ning:compress-lzf:jar:1.0.3:compile,Apache License 2.0,http://github.com/ning/compress com.opencsv:opencsv:jar:3.7:compile,Apache 2,http://opencsv.sf.net com.spatial4j:spatial4j:jar:0.5:compile,The Apache Software License, Version 2.0, com.tdunning:t-digest:jar:3.0:compile,The Apache Software License, Version 2.0,https://github.com/tdunning/t-digest +com.univocity:univocity-parsers:jar:2.5.9:compile,ASLv2,https://github.com/uniVocity/univocity-parsers +com.vlkan:flatbuffers:jar:1.2.0-3f79e055:compile,ASLv2,https://github.com/vy/flatbuffers com.yammer.metrics:metrics-core:jar:2.2.0:compile,ASLv2, commons-beanutils:commons-beanutils-core:jar:1.8.0:compile,ASLv2,http://commons.apache.org/beanutils/ commons-beanutils:commons-beanutils-core:jar:1.8.0:provided,ASLv2,http://commons.apache.org/beanutils/ @@ -203,19 +236,28 @@ commons-lang:commons-lang:jar:2.6:provided,ASLv2,http://commons.apache.org/lang/ commons-logging:commons-logging:jar:1.1.1:compile,ASLv2,http://commons.apache.org/logging commons-logging:commons-logging:jar:1.1.3:compile,ASLv2,http://commons.apache.org/proper/commons-logging/ commons-logging:commons-logging:jar:1.2:compile,ASLv2,http://commons.apache.org/proper/commons-logging/ +commons-net:commons-net:jar:2.2:compile,ASLv2,http://commons.apache.org/net/ commons-net:commons-net:jar:3.1:compile,ASLv2,http://commons.apache.org/net/ commons-net:commons-net:jar:3.1:provided,ASLv2,http://commons.apache.org/net/ commons-text:commons-text:jar:1.1:compile,ASLv2,http://commons.apache.org/proper/commons-text/ commons-validator:commons-validator:jar:1.4.0:compile,ASLv2,http://commons.apache.org/validator/ commons-validator:commons-validator:jar:1.5.1:compile,ASLv2,http://commons.apache.org/proper/commons-validator/ commons-validator:commons-validator:jar:1.6:compile,ASLv2,http://commons.apache.org/proper/commons-validator/ +et.razorvine:pyrolite:jar:4.13:compile,MIT,https://github.com/irmen/Pyrolite +io.airlift:aircompressor:jar:0.8:compile,ASLv2,https://github.com/airlift/aircompressor io.confluent:kafka-avro-serializer:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/ io.confluent:kafka-schema-registry-client:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/ +io.dropwizard.metrics:metrics-core:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics +io.dropwizard.metrics:metrics-graphite:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics +io.dropwizard.metrics:metrics-json:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics +io.dropwizard.metrics:metrics-jvm:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2, io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2, -io.netty:netty:jar:3.10.5.Final:compile,Apache License, Version 2.0,http://netty.io/ +io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2, io.netty:netty:jar:3.6.2.Final:compile,Apache License, Version 2.0,http://netty.io/ io.netty:netty:jar:3.7.0.Final:compile,Apache License, Version 2.0,http://netty.io/ +io.netty:netty:jar:3.9.9.Final:compile,Apache License, Version 2.0,http://netty.io/ +io.netty:netty:jar:3.10.5.Final:compile,Apache License, Version 2.0,http://netty.io/ io.thekraken:grok:jar:0.1.0:compile,Apache License, Version 2.0,http://maven.apache.org javax.inject:javax.inject:jar:1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/atinject/ joda-time:joda-time:jar:2.3:compile,Apache 2,http://www.joda.org/joda-time/ @@ -224,9 +266,12 @@ joda-time:joda-time:jar:2.9.9:compile,Apache 2,http://www.joda.org/joda-time/ log4j:log4j:jar:1.2.15:compile,The Apache Software License, Version 2.0,http://logging.apache.org:80/log4j/1.2/ log4j:log4j:jar:1.2.16:compile,The Apache Software License, Version 2.0,http://logging.apache.org/log4j/1.2/ log4j:log4j:jar:1.2.17:compile,The Apache Software License, Version 2.0,http://logging.apache.org/log4j/1.2/ +net.iharder:base64:jar:2.3.8:compile,Public Domain,http://iharder.sourceforge.net/current/java/base64/ net.java.dev.jets3t:jets3t:jar:0.9.0:compile,Apache License, Version 2.0,http://www.jets3t.org +net.java.dev.jets3t:jets3t:jar:0.9.4:compile,Apache License, Version 2.0,http://www.jets3t.org net.jpountz.lz4:lz4:jar:1.2.0:compile,The Apache Software License, Version 2.0,https://github.com/jpountz/lz4-java net.jpountz.lz4:lz4:jar:1.3.0:compile,The Apache Software License, Version 2.0,https://github.com/jpountz/lz4-java +net.sf.py4j:py4j:jar:0.10.7:compile,, nl.jqno.equalsverifier:equalsverifier:jar:2.0.2:compile,The Apache Software License, Version 2.0,http://www.jqno.nl/equalsverifier org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org @@ -354,6 +399,7 @@ org.springframework.security:spring-security-core:jar:4.1.3.RELEASE:compile,ASLv org.springframework.security:spring-security-core:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security org.springframework.security:spring-security-web:jar:4.1.3.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security org.springframework.security:spring-security-web:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security +org.spark-project.spark:unused:jar:1.0.0:compile,ASLv2,https://spark.apache.org antlr:antlr:jar:2.7.7:compile,BSD 3-Clause License,http://www.antlr2.org com.h2database:h2:jar:1.4.192:compile,EPL 1.0,http://www.h2database.com/html/license.html com.h2database:h2:jar:1.4.197:compile,EPL 1.0,http://www.h2database.com/html/license.html @@ -370,6 +416,7 @@ org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://g org.springframework.kafka:spring-kafka:jar:2.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka ch.hsr:geohash:jar:1.3.0:compile,ASLv2,https://github.com/kungfoo/geohash-java org.locationtech.spatial4j:spatial4j:jar:0.6:compile,ASLv2,https://github.com/locationtech/spatial4j +com.github.luben:zstd-jni:jar:1.3.2-2:compile,BSD,https://github.com/luben/zstd-jni com.github.spullara.mustache.java:compiler:jar:0.9.3:compile,ASLv2,https://github.com/spullara/mustache.java/blob/master/LICENSE io.netty:netty-buffer:jar:4.1.13.Final:compile,ASLv2,http://netty.io/ io.netty:netty-codec-http:jar:4.1.13.Final:compile,ASLv2,http://netty.io/ @@ -395,6 +442,19 @@ org.elasticsearch:securesm:jar:1.1:compile,ASLv2,https://github.com/elastic/elas org.hdrhistogram:HdrHistogram:jar:2.1.9:compile,BSD,https://github.com/HdrHistogram/HdrHistogram/blob/master/LICENSE.txt com.trendmicro:tlsh:jar:3.7.1:compile,ASLv2,https://github.com/trendmicro/tlsh org.glassfish:javax.json:jar:1.0.4:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/jsonp +org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2 +org.glassfish.hk2.external:javax.inject:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2 +org.glassfish.hk2:hk2-api:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2 +org.glassfish.hk2:hk2-locator:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2 +org.glassfish.hk2:hk2-utils:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2 +org.glassfish.hk2:osgi-resource-locator:jar:1.0.1:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2 +org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.22.2:compile +org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.22.2:compile +org.glassfish.jersey.containers:jersey-container-servlet:jar:2.22.2:compile +org.glassfish.jersey.core:jersey-client:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey +org.glassfish.jersey.core:jersey-common:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey +org.glassfish.jersey.core:jersey-server:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey +org.glassfish.jersey.media:jersey-media-jaxb:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey org.eclipse.persistence:javax.persistence:jar:2.1.1:compile,EPL 1.0,http://www.eclipse.org/eclipselink org.eclipse.persistence:org.eclipse.persistence.antlr:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink org.eclipse.persistence:org.eclipse.persistence.asm:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index d950b07..673072b 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -28,7 +28,6 @@ import com.google.common.cache.RemovalNotification; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.stellar.dsl.Context; -import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java index e76b897..7cdb607 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java @@ -22,11 +22,11 @@ package org.apache.metron.profiler; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.json.simple.JSONObject; import java.io.Serializable; +import java.util.Map; /** * Defines the 'route' a message must take through the Profiler. @@ -74,6 +74,10 @@ public class MessageRoute implements Serializable { this.timestamp = timestamp; } + public MessageRoute() { + // necessary for serialization + } + public String getEntity() { return entity; } @@ -98,6 +102,10 @@ public class MessageRoute implements Serializable { this.message = message; } + public void setMessage(Map message) { + this.message = new JSONObject(message); + } + public Long getTimestamp() { return timestamp; } http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml new file mode 100644 index 0000000..93ce08a --- /dev/null +++ b/metron-analytics/metron-profiler-spark/pom.xml @@ -0,0 +1,195 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-analytics</artifactId> + <version>0.5.1</version> + </parent> + <artifactId>metron-profiler-spark</artifactId> + <url>https://metron.apache.org/</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>${global_spark_version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.11</artifactId> + <version>${global_spark_version}</version> + <exclusions> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-profiler-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hbase</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${global_hbase_version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <!-- allows profiles to use the Stellar stats functions --> + <groupId>org.apache.metron</groupId> + <artifactId>metron-statistics</artifactId> + <version>${project.parent.version}</version> + <exclusions> + <exclusion> + <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${global_log4j_core_version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${global_log4j_core_version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${global_shade_version}</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>com.tdunning</pattern> + <shadedPattern>org.apache.metron.tdunning</shadedPattern> + </relocation> + </relocations> + <artifactSet> + <excludes> + <exclude>storm:storm-core:*</exclude> + <exclude>storm:storm-lib:*</exclude> + <exclude>org.slf4j.impl*</exclude> + <exclude>org.slf4j:slf4j-log4j*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resources> + <resource>.yaml</resource> + <resource>LICENSE.txt</resource> + <resource>ASL2.0</resource> + <resource>NOTICE.txt</resource> + </resources> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass></mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java new file mode 100644 index 0000000..f999613 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark; + +import com.google.common.collect.Maps; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.MessageRoute; +import org.apache.metron.profiler.spark.function.GroupByPeriodFunction; +import org.apache.metron.profiler.spark.function.HBaseWriterFunction; +import org.apache.metron.profiler.spark.function.MessageRouterFunction; +import org.apache.metron.profiler.spark.function.ProfileBuilderFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.Map; +import java.util.Properties; + +import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH; +import static org.apache.spark.sql.functions.sum; + +/** + * The 'Batch Profiler' that generates profiles by consuming data in batch from archived telemetry. + * + * <p>The Batch Profiler is executed in Spark. + */ +public class BatchProfiler implements Serializable { + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Execute the Batch Profiler. + * + * @param spark The spark session. + * @param properties The profiler configuration properties. + * @param profiles The profile definitions. + * @return The number of profile measurements produced. + */ + public long run(SparkSession spark, + Properties properties, + Properties globalProperties, + ProfilerConfig profiles) { + + LOG.debug("Building {} profile(s)", profiles.getProfiles().size()); + Map<String, String> globals = Maps.fromProperties(globalProperties); + + String inputFormat = TELEMETRY_INPUT_FORMAT.get(properties, String.class); + String inputPath = TELEMETRY_INPUT_PATH.get(properties, String.class); + LOG.debug("Loading telemetry from '{}'", inputPath); + + // fetch the archived telemetry + Dataset<String> telemetry = spark + .read() + .format(inputFormat) + .load(inputPath) + .as(Encoders.STRING()); + LOG.debug("Found {} telemetry record(s)", telemetry.cache().count()); + + // find all routes for each message + Dataset<MessageRoute> routes = telemetry + .flatMap(new MessageRouterFunction(profiles, globals), Encoders.bean(MessageRoute.class)); + LOG.debug("Generated {} message route(s)", routes.cache().count()); + + // build the profiles + Dataset<ProfileMeasurementAdapter> measurements = routes + .groupByKey(new GroupByPeriodFunction(properties), Encoders.STRING()) + .mapGroups(new ProfileBuilderFunction(properties, globals), Encoders.bean(ProfileMeasurementAdapter.class)); + LOG.debug("Produced {} profile measurement(s)", measurements.cache().count()); + + // write the profile measurements to HBase + long count = measurements + .mapPartitions(new HBaseWriterFunction(properties), Encoders.INT()) + .agg(sum("value")) + .head() + .getLong(0); + LOG.debug("{} profile measurement(s) written to HBase", count); + + return count; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java new file mode 100644 index 0000000..054806e --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark; + +import org.apache.hadoop.hbase.client.Durability; +import org.apache.metron.stellar.common.utils.ConversionUtils; + +import java.util.Map; +import java.util.Properties; + +/** + * Defines the configuration values recognized by the Batch Profiler. + */ +public enum BatchProfilerConfig { + + PERIOD_DURATION_UNITS("profiler.period.duration.units", "MINUTES", String.class), + + PERIOD_DURATION("profiler.period.duration", 15, Integer.class), + + HBASE_SALT_DIVISOR("profiler.hbase.salt.divisor", 1000, Integer.class), + + HBASE_TABLE_PROVIDER("profiler.hbase.table.provider", "org.apache.metron.hbase.HTableProvider", String.class), + + HBASE_TABLE_NAME("profiler.hbase.table", "profiler", String.class), + + HBASE_COLUMN_FAMILY("profiler.hbase.column.family", "P", String.class), + + HBASE_WRITE_DURABILITY("profiler.hbase.durability", Durability.USE_DEFAULT, Durability.class), + + TELEMETRY_INPUT_FORMAT("profiler.batch.input.format", "text", String.class), + + TELEMETRY_INPUT_PATH("profiler.batch.input.path", "hdfs://localhost:9000/apps/metron/indexing/indexed/*/*", String.class); + + /** + * The key for the configuration value. + */ + private String key; + + /** + * The default value of the configuration, if none other is specified. + */ + private Object defaultValue; + + /** + * The type of the configuration value. + */ + private Class<?> valueType; + + BatchProfilerConfig(String key, Object defaultValue, Class<?> valueType) { + this.key = key; + this.defaultValue = defaultValue; + this.valueType = valueType; + } + + /** + * Returns the key of the configuration value. + */ + public String getKey() { + return key; + } + + /** + * Returns the default value of the configuration. + */ + public Object getDefault() { + return getDefault(valueType); + } + + /** + * Returns the default value of the configuration, cast to the expected type. + * + * @param clazz The class of the expected type of the configuration value. + * @param <T> The expected type of the configuration value. + */ + public <T> T getDefault(Class<T> clazz) { + return defaultValue == null ? null: ConversionUtils.convert(defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values. + * + * @param config A map containing configuration values. + */ + public Object get(Map<String, String> config) { + return getOrDefault(config, defaultValue); + } + + /** + * Returns the configuration value from a map of configuration values. + * + * @param properties Configuration properties. + */ + public Object get(Properties properties) { + return getOrDefault(properties, defaultValue); + } + + /** + * Returns the configuration value from a map of configuration values, cast to the expected type. + * + * @param config A map containing configuration values. + */ + public <T> T get(Map<String, String> config, Class<T> clazz) { + return getOrDefault(config, defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values, cast to the expected type. + * + * @param properties Configuration properties. + */ + public <T> T get(Properties properties, Class<T> clazz) { + return getOrDefault(properties, defaultValue, clazz); + } + + /** + * Returns the configuration value from a map of configuration values. If the value is not specified, + * the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @return The configuration value or the specified default, if one is not defined. + */ + private Object getOrDefault(Map<String, String> config, Object defaultValue) { + return getOrDefault(config, defaultValue, valueType); + } + + /** + * Returns the configuration value from a map of configuration values. If the value is not specified, + * the default value is returned. + * + * @param properties A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @return The configuration value or the specified default, if one is not defined. + */ + private Object getOrDefault(Properties properties, Object defaultValue) { + return getOrDefault(properties, defaultValue, valueType); + } + + /** + * Returns the configuration value, cast to the expected type, from a map of configuration values. + * If the value is not specified, the default value is returned. + * + * @param config A map containing configuration values. + * @param defaultValue The default value to return, if one is not defined. + * @param clazz The class of the expected type of the configuration value. + * @param <T> The expected type of the configuration value. + * @return The configuration value or the specified default, if one is not defined. + */ + private <T> T getOrDefault(Map<String, String> config, Object defaultValue, Class<T> clazz) { + Object value = config.getOrDefault(key, defaultValue.toString()); + return value == null ? null : ConversionUtils.convert(value, clazz); + } + + /** + * Returns the configuration value, cast to the expected type, from a map of configuration values. + * If the value is not specified, the default value is returned. + * + * @param properties Configuration properties. + * @param defaultValue The default value to return, if one is not defined. + * @param clazz The class of the expected type of the configuration value. + * @param <T> The expected type of the configuration value. + * @return The configuration value or the specified default, if one is not defined. + */ + private <T> T getOrDefault(Properties properties, Object defaultValue, Class<T> clazz) { + Object value = properties.getOrDefault(key, defaultValue); + return value == null ? null : ConversionUtils.convert(value, clazz); + } + + @Override + public String toString() { + return key; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java new file mode 100644 index 0000000..5da7d04 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark; + +import org.apache.metron.common.utils.SerDeUtils; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.ProfilePeriod; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * An adapter for the {@link ProfileMeasurement} class so that the data + * can be serialized as required by Spark. + * + * <p>The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object` well. This + * adapter encodes the profile's result as byte[] rather than an Object to work around this. + */ +public class ProfileMeasurementAdapter implements Serializable { + + /** + * The name of the profile that this measurement is associated with. + */ + private String profileName; + + /** + * The name of the entity being profiled. + */ + private String entity; + + /** + * A monotonically increasing number identifying the period. The first period is 0 + * and began at the epoch. + */ + private Long periodId; + + /** + * The duration of each period in milliseconds. + */ + private Long durationMillis; + + /** + * The result of evaluating the profile expression. + * + * The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object`. This + * adapter encodes the profile's result as `byte[]` rather than an `Object` to work around this. + */ + private byte[] profileValue; + + public ProfileMeasurementAdapter() { + // default constructor required for serialization in Spark + } + + public ProfileMeasurementAdapter(ProfileMeasurement measurement) { + this.profileName = measurement.getProfileName(); + this.entity = measurement.getEntity(); + this.periodId = measurement.getPeriod().getPeriod(); + this.durationMillis = measurement.getPeriod().getDurationMillis(); + this.profileValue = SerDeUtils.toBytes(measurement.getProfileValue()); + } + + public ProfileMeasurement toProfileMeasurement() { + ProfilePeriod period = ProfilePeriod.fromPeriodId(periodId, durationMillis, TimeUnit.MILLISECONDS); + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName(profileName) + .withEntity(entity) + .withPeriod(period) + .withProfileValue(SerDeUtils.fromBytes(profileValue, Object.class)); + return measurement; + } + + public String getProfileName() { + return profileName; + } + + public void setProfileName(String profileName) { + this.profileName = profileName; + } + + public String getEntity() { + return entity; + } + + public void setEntity(String entity) { + this.entity = entity; + } + + public Long getPeriodId() { + return periodId; + } + + public void setPeriodId(Long periodId) { + this.periodId = periodId; + } + + public Long getDurationMillis() { + return durationMillis; + } + + public void setDurationMillis(Long durationMillis) { + this.durationMillis = durationMillis; + } + + public byte[] getProfileValue() { + return profileValue; + } + + public void setProfileValue(byte[] profileValue) { + this.profileValue = profileValue; + } + + public void setProfileValue(Object profileValue) { + this.profileValue = SerDeUtils.toBytes(profileValue); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java new file mode 100644 index 0000000..1b602f4 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.spark.function; + +import org.apache.metron.profiler.MessageRoute; +import org.apache.metron.profiler.ProfilePeriod; +import org.apache.spark.api.java.function.MapFunction; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS; + +/** + * Defines how {@link MessageRoute} are grouped. + * + * The routes are grouped by (profile, entity, periodId) so that all of the required + * messages are available to produce a {@link org.apache.metron.profiler.ProfileMeasurement}. + */ +public class GroupByPeriodFunction implements MapFunction<MessageRoute, String> { + + /** + * The duration of each profile period. + */ + private int periodDuration; + + /** + * The units of the period duration. + */ + private TimeUnit periodDurationUnits; + + public GroupByPeriodFunction(Properties profilerProperties) { + periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(profilerProperties, String.class)); + periodDuration = PERIOD_DURATION.get(profilerProperties, Integer.class); + } + + @Override + public String call(MessageRoute route) { + ProfilePeriod period = ProfilePeriod.fromTimestamp(route.getTimestamp(), periodDuration, periodDurationUnits); + return route.getProfileDefinition().getProfile() + "-" + route.getEntity() + "-" + period.getPeriod(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java new file mode 100644 index 0000000..cfabd94 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java @@ -0,0 +1,171 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark.function; + +import org.apache.commons.collections4.IteratorUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.metron.hbase.HTableProvider; +import org.apache.metron.hbase.TableProvider; +import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.hbase.ColumnBuilder; +import org.apache.metron.profiler.hbase.RowKeyBuilder; +import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder; +import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; +import org.apache.metron.profiler.spark.ProfileMeasurementAdapter; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_SALT_DIVISOR; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_WRITE_DURABILITY; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS; + +/** + * Writes the profile measurements to HBase in Spark. + */ +public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasurementAdapter, Integer> { + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private TableProvider tableProvider; + + /** + * The name of the HBase table to write to. + */ + private String tableName; + + /** + * The durability guarantee when writing to HBase. + */ + private Durability durability; + + /** + * Builds the HBase row key. + */ + private RowKeyBuilder rowKeyBuilder; + + /** + * Assembles the columns for HBase. + */ + private ColumnBuilder columnBuilder; + + public HBaseWriterFunction(Properties properties) { + tableName = HBASE_TABLE_NAME.get(properties, String.class); + durability = HBASE_WRITE_DURABILITY.get(properties, Durability.class); + + // row key builder + int saltDivisor = HBASE_SALT_DIVISOR.get(properties, Integer.class); + int periodDuration = PERIOD_DURATION.get(properties, Integer.class); + TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class)); + rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodDurationUnits); + + // column builder + String columnFamily = HBASE_COLUMN_FAMILY.get(properties, String.class); + columnBuilder = new ValueOnlyColumnBuilder(columnFamily); + + // hbase table provider + String providerImpl = HBASE_TABLE_PROVIDER.get(properties, String.class); + tableProvider = createTableProvider(providerImpl); + } + + /** + * Writes a set of measurements to HBase. + * + * @param iterator The measurements to write. + * @return The number of measurements written to HBase. + */ + @Override + public Iterator<Integer> call(Iterator<ProfileMeasurementAdapter> iterator) throws Exception { + int count = 0; + LOG.debug("About to write profile measurement(s) to HBase"); + + // do not open hbase connection, if nothing to write + List<ProfileMeasurementAdapter> measurements = IteratorUtils.toList(iterator); + if(measurements.size() > 0) { + + // open an HBase connection + Configuration config = HBaseConfiguration.create(); + try (HBaseClient client = new HBaseClient(tableProvider, config, tableName)) { + + for (ProfileMeasurementAdapter adapter : measurements) { + ProfileMeasurement m = adapter.toProfileMeasurement(); + client.addMutation(rowKeyBuilder.rowKey(m), columnBuilder.columns(m), durability); + } + count = client.mutate(); + + } catch (IOException e) { + LOG.error("Unable to open connection to HBase", e); + throw new RuntimeException(e); + } + } + + LOG.debug("{} profile measurement(s) written to HBase", count); + return IteratorUtils.singletonIterator(count); + } + + /** + * Set the {@link TableProvider} using the class name of the provider. + * @param providerImpl The name of the class. + * @return + */ + public HBaseWriterFunction withTableProviderImpl(String providerImpl) { + this.tableProvider = createTableProvider(providerImpl); + return this; + } + + /** + * Creates a TableProvider based on a class name. + * @param providerImpl The class name of a TableProvider + */ + private static TableProvider createTableProvider(String providerImpl) { + LOG.trace("Creating table provider; className={}", providerImpl); + + // if class name not defined, use a reasonable default + if(StringUtils.isEmpty(providerImpl) || providerImpl.charAt(0) == '$') { + return new HTableProvider(); + } + + // instantiate the table provider + try { + Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(providerImpl); + return clazz.getConstructor().newInstance(); + + } catch (InstantiationException | IllegalAccessException | IllegalStateException | + InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) { + throw new IllegalStateException("Unable to instantiate connector", e); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java new file mode 100644 index 0000000..cf8029f --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark.function; + +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.DefaultMessageRouter; +import org.apache.metron.profiler.MessageRoute; +import org.apache.metron.profiler.MessageRouter; +import org.apache.metron.stellar.dsl.Context; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * The function responsible for finding routes for a given message in Spark. + */ +public class MessageRouterFunction implements FlatMapFunction<String, MessageRoute> { + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The global configuration used for the execution of Stellar. + */ + private Map<String, String> globals; + + /** + * The profile definitions. + */ + private ProfilerConfig profilerConfig; + + public MessageRouterFunction(ProfilerConfig profilerConfig, Map<String, String> globals) { + this.profilerConfig = profilerConfig; + this.globals = globals; + } + + /** + * Find all routes for a given telemetry message. + * + * <p>A message may need routed to multiple profiles should it be needed by more than one. A + * message may also not be routed should it not be needed by any profiles. + * + * @param jsonMessage The raw JSON message. + * @return A list of message routes. + */ + @Override + public Iterator<MessageRoute> call(String jsonMessage) throws Exception { + List<MessageRoute> routes; + + JSONParser parser = new JSONParser(); + Context context = TaskUtils.getContext(globals); + MessageRouter router = new DefaultMessageRouter(context); + + // parse the raw message + Optional<JSONObject> message = toMessage(jsonMessage, parser); + if(message.isPresent()) { + + // find all routes + routes = router.route(message.get(), profilerConfig, context); + LOG.trace("Found {} route(s) for a message", routes.size()); + + } else { + // the message is not valid and must be ignored + routes = Collections.emptyList(); + LOG.trace("No route possible. Unable to parse message."); + } + + return routes.iterator(); + } + + /** + * Parses the raw JSON of a message. + * + * @param json The raw JSON to parse. + * @param parser The parser to use. + * @return The parsed telemetry message. + */ + private static Optional<JSONObject> toMessage(String json, JSONParser parser) { + try { + JSONObject message = (JSONObject) parser.parse(json); + return Optional.of(message); + + } catch(Throwable e) { + LOG.warn(String.format("Unable to parse message, message will be ignored; message='%s'", json), e); + return Optional.empty(); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java new file mode 100644 index 0000000..273695b --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark.function; + +import org.apache.metron.profiler.DefaultMessageDistributor; +import org.apache.metron.profiler.MessageDistributor; +import org.apache.metron.profiler.MessageRoute; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.spark.ProfileMeasurementAdapter; +import org.apache.metron.stellar.dsl.Context; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.Comparator.comparing; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS; + +/** + * The function responsible for building profiles in Spark. + */ +public class ProfileBuilderFunction implements MapGroupsFunction<String, MessageRoute, ProfileMeasurementAdapter> { + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private long periodDurationMillis; + private Map<String, String> globals; + + public ProfileBuilderFunction(Properties properties, Map<String, String> globals) { + TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class)); + int periodDuration = PERIOD_DURATION.get(properties, Integer.class); + this.periodDurationMillis = periodDurationUnits.toMillis(periodDuration); + this.globals = globals; + } + + /** + * Build a profile from a set of message routes. + * + * <p>This assumes that all of the necessary routes have been provided + * + * @param group The group identifier. + * @param iterator The message routes. + * @return + */ + @Override + public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute> iterator) throws Exception { + // create the distributor; some settings are unnecessary because it is cleaned-up immediately after processing the batch + int maxRoutes = Integer.MAX_VALUE; + long profileTTLMillis = Long.MAX_VALUE; + MessageDistributor distributor = new DefaultMessageDistributor(periodDurationMillis, profileTTLMillis, maxRoutes); + Context context = TaskUtils.getContext(globals); + + // sort the messages/routes + List<MessageRoute> routes = toStream(iterator) + .sorted(comparing(rt -> rt.getTimestamp())) + .collect(Collectors.toList()); + LOG.debug("Building a profile for group '{}' from {} message(s)", group, routes.size()); + + // apply each message/route to build the profile + for(MessageRoute route: routes) { + distributor.distribute(route, context); + } + + // flush the profile + List<ProfileMeasurement> measurements = distributor.flush(); + if(measurements.size() > 1) { + throw new IllegalStateException("No more than 1 profile measurement is expected"); + } + + ProfileMeasurement m = measurements.get(0); + LOG.debug("Profile measurement created; profile={}, entity={}, period={}, value={}", + m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(), m.getProfileValue()); + return new ProfileMeasurementAdapter(m); + } + + private static <T> Stream<T> toStream(Iterator<T> iterator) { + Iterable<T> iterable = () -> iterator; + return StreamSupport.stream(iterable.spliterator(), false); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java new file mode 100644 index 0000000..d401f12 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark.function; + +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.StellarFunctions; + +import java.io.Serializable; +import java.util.Map; + +public class TaskUtils implements Serializable { + + /** + * Create the execution context for running Stellar. + */ + public static Context getContext(Map<String, String> globals) { + Context context = new Context.Builder() + .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals) + .with(Context.Capabilities.STELLAR_CONFIG, () -> globals) + .build(); + StellarFunctions.initialize(context); + return context; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java new file mode 100644 index 0000000..f560740 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; +import java.util.Properties; + +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH; +import static org.junit.Assert.assertEquals; + +public class BatchProfilerIntegrationTest { + + private static SparkSession spark; + private MockHTable profilerTable; + private Properties profilerProperties; + + @BeforeClass + public static void setupSpark() { + SparkConf conf = new SparkConf() + .setMaster("local") + .setAppName("BatchProfilerIntegrationTest") + .set("spark.sql.shuffle.partitions", "8"); + spark = SparkSession + .builder() + .config(conf) + .getOrCreate(); + } + + @AfterClass + public static void tearDownSpark() { + if(spark != null) { + spark.close(); + } + } + + @Before + public void setup() { + profilerProperties = new Properties(); + + // define the source of the input telemetry + profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json"); + profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text"); + + // define where the output will go + String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class); + String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class); + profilerProperties.put(HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName()); + + // create the mock hbase table + profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily); + } + + @Test + public void testBatchProfiler() { + + // run the batch profiler + BatchProfiler profiler = new BatchProfiler(); + profiler.run(spark, profilerProperties, getGlobals(), getProfile()); + + List<Put> puts = profilerTable.getPutLog(); + assertEquals(2, puts.size()); + } + + + private ProfilerConfig getProfile() { + ProfileConfig profile = new ProfileConfig() + .withProfile("profile1") + .withForeach("ip_src_addr") + .withUpdate("count", "count + 1") + .withResult("count"); + return new ProfilerConfig() + .withProfile(profile); + } + + private Properties getGlobals() { + return new Properties(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java new file mode 100644 index 0000000..55f3e21 --- /dev/null +++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java @@ -0,0 +1,176 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.spark.function; + +import org.apache.commons.collections4.IteratorUtils; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.spark.ProfileMeasurementAdapter; +import org.json.simple.JSONObject; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY; +import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME; + +public class HBaseWriterFunctionTest { + + Properties profilerProperties; + + @Before + public void setup() { + profilerProperties = getProfilerProperties(); + + // create a mock table for HBase + String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class); + String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class); + MockHBaseTableProvider.addToCache(tableName, columnFamily); + } + + @Test + public void testWrite() throws Exception { + + JSONObject message = getMessage(); + String entity = (String) message.get("ip_src_addr"); + long timestamp = (Long) message.get("timestamp"); + ProfileConfig profile = getProfile(); + + // setup the profile measurements that will be written + List<ProfileMeasurementAdapter> measurements = createMeasurements(1, entity, timestamp, profile); + + // setup the function to test + HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties); + function.withTableProviderImpl(MockHBaseTableProvider.class.getName()); + + // write the measurements + Iterator<Integer> results = function.call(measurements.iterator()); + + // validate the result + List<Integer> counts = IteratorUtils.toList(results); + Assert.assertEquals(1, counts.size()); + Assert.assertEquals(1, counts.get(0).intValue()); + } + + @Test + public void testWriteMany() throws Exception { + + JSONObject message = getMessage(); + String entity = (String) message.get("ip_src_addr"); + long timestamp = (Long) message.get("timestamp"); + ProfileConfig profile = getProfile(); + + // setup the profile measurements that will be written + List<ProfileMeasurementAdapter> measurements = createMeasurements(10, entity, timestamp, profile); + + // setup the function to test + HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties); + function.withTableProviderImpl(MockHBaseTableProvider.class.getName()); + + // write the measurements + Iterator<Integer> results = function.call(measurements.iterator()); + + // validate the result + List<Integer> counts = IteratorUtils.toList(results); + Assert.assertEquals(1, counts.size()); + Assert.assertEquals(10, counts.get(0).intValue()); + } + + @Test + public void testWriteNone() throws Exception { + + // there are no profile measurements to write + List<ProfileMeasurementAdapter> measurements = new ArrayList<>(); + + // setup the function to test + HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties); + function.withTableProviderImpl(MockHBaseTableProvider.class.getName()); + + // write the measurements + Iterator<Integer> results = function.call(measurements.iterator()); + + // validate the result + List<Integer> counts = IteratorUtils.toList(results); + Assert.assertEquals(1, counts.size()); + Assert.assertEquals(0, counts.get(0).intValue()); + } + + /** + * Create a list of measurements for testing. + * + * @param count The number of messages to create. + * @param entity The entity. + * @param timestamp The timestamp. + * @param profile The profile definition. + * @return + */ + private List<ProfileMeasurementAdapter> createMeasurements(int count, String entity, long timestamp, ProfileConfig profile) { + List<ProfileMeasurementAdapter> measurements = new ArrayList<>(); + + for(int i=0; i<count; i++) { + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName(profile.getProfile()) + .withEntity(entity) + .withPeriod(timestamp, 15, TimeUnit.MINUTES); + + // wrap the measurement using the adapter + measurements.add(new ProfileMeasurementAdapter(measurement)); + } + + return measurements; + } + + /** + * Returns a telemetry message to use for testing. + */ + private JSONObject getMessage() { + JSONObject message = new JSONObject(); + message.put("ip_src_addr", "192.168.1.1"); + message.put("status", "red"); + message.put("timestamp", System.currentTimeMillis()); + return message; + } + + /** + * Returns profiler properties to use for testing. + */ + private Properties getProfilerProperties() { + return new Properties(); + } + + /** + * Returns a profile definition to use for testing. + */ + private ProfileConfig getProfile() { + return new ProfileConfig() + .withProfile("profile1") + .withForeach("ip_src_addr") + .withUpdate("count", "count + 1") + .withResult("count"); + + } +}