GitHub user nickwallen opened a pull request:
https://github.com/apache/metron/pull/1150
METRON-1707 Port Profiler to Spark
This ports the Profiler to run in Spark.
This is a pull request against the `METRON-1699-create-batch-profiler`
feature branch.
This is dependent on #1145 #1146 #1148 #1147 . By filtering on the last
commit, this PR can be reviewed before the others are reviewed and merged.
## Testing
### Test Against Regressions
1. Launch the development environment.
1. Ensure alerts are created in the Alerts UI.
1. Ensure the Service Check in Ambari passes.
1. Follow the instructions in the Profiler README to create a basic profile
in the REPL.
1. Follow the instructions in the Profiler README to create a simple
profile using the Profiler topology in Storm.
### Test New Functionality
1. Start-up the development environment. Allow Metron to run for a bit so
that a fair amount of telemetry is archived in HDFS.
1. Stop all Metron services.
1. Install Spark2 using Ambari.
* Use Add Service > Spark2, then follow prompts.
1. Create a profile by creating and editing a `profiler.json` file as
follows.
```
[root@node1 0.5.1]# cat $METRON_HOME/config/zookeeper/profiler.json
{
"profiles": [
{
"profile": "hello-world",
"foreach": "'global'",
"init": { "count": "0" },
"update": { "count": "count + 1" },
"result": "count"
}
],
"timestampField": "timestamp"
}
```
1. Save the same Profiler configuration to Zookeeper.
```
[root@node1 0.5.1]# $METRON_HOME/bin/stellar -z $ZOOKEEPER
Stellar, Go!
[Stellar]>>> conf := SHELL_EDIT()
{
"profiles": [
{
"profile": "hello-world",
"foreach": "'global'",
"init": { "count": "0" },
"update": { "count": "count + 1" },
"result": "count"
}
],
"timestampField": "timestamp"
}
[Stellar]>>>
[Stellar]>>> CONFIG_PUT("PROFILER", conf)
```
1. Using Ambari, make the following configuration changes for the Profiler.
* Profiler Setup > Profile duration = 1 minute
* Profiler Setup > Window duration = 10 seconds
* Profiler Setup > Window time lag = 10 seconds
* Profiler Setup > Time to Live = 5 minutes
* Kafka > Input Topic Start = EARLIEST
1. Count the number of messages in the 'indexing' topic. This should not
be changing.
```
[root@node1 ~]# /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh
kafka.tools.GetOffsetShell \
--broker-list $BROKERLIST \
--topic indexing \
--time -1
indexing:0:8130
```
In this case there are 8,131 messages.
1. Delete any previous profile measurements from HBase.
```
[root@node1 ~]# hbase shell
...
hbase(main):001:0> truncate 'profiler'
Truncating 'profiler' table (it may take a while):
- Disabling table...
- Truncating table...
0 row(s) in 4.1070 seconds
```
1. Confirm that all of the messages were successfully indexed in HDFS.
```
[root@node1 ~]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/* | wc
-l
4065
[root@node1 ~]# hdfs dfs -cat /apps/metron/indexing/indexed/snort/* |
wc -l
4065
[root@node1 ~]# hdfs dfs -cat /apps/metron/indexing/indexed/*/* | wc -l
8130
```
* Remember that we found 8,130 in the indexing topic previously.
1. Now we need to deploy the Batch Profiler. This is not yet handled in
the feature branch.
1. Copy the CLI script and Batch Profiler jar to the VM.
From the host machine; outside the development VM, run the following.
```
vagrant scp
../../../metron-analytics/metron-profiler-spark/target/metron-profiler-spark-0.5.1.jar
/tmp
vagrant scp
../../../metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh
/tmp
```
Then from the development VM, run the following.
```
cp /tmp/start_batch_profiler.sh $METRON_HOME/bin/
cp /tmp/metron-profiler-spark-0.5.1.jar $METRON_HOME/lib/
```
1. Create a properties file for the Batch Profiler like so.
```
[root@node1 0.5.1]# cat config/batch-profiler.properties
spark.master=local
spark.app.name=Batch Profiler
spark.sql.shuffle.partitions=8
profiler.batch.input.path=hdfs://node1:8020/apps/metron/indexed/indexing/*/*
profiler.period.duration=1
profiler.period.duration.units=MINUTES
```
1. Execute the following to run the Batch Profiler.
```
source /etc/default/metron
export METRON_VERSION=0.5.1
export SPARK_MAJOR_VERSION=2
cp /usr/hdp/current/hbase-client/conf/hbase-site.xml
/usr/hdp/current/spark2-client/conf/
cp /usr/hdp/current/spark2-client/conf/log4j.properties.template
/usr/hdp/current/spark2-client/conf/log4j.properties
echo "log4j.logger.org.apache.metron.profiler.spark=DEBUG" >>
/usr/hdp/current/spark2-client/conf/log4j.properties
export PATH=$PATH:/usr/hdp/current/spark2-client/bin/
spark-submit \
--class org.apache.metron.profiler.spark.cli.BatchProfilerCLI \
--properties-file ${METRON_HOME}/config/batch-profiler.properties \
${METRON_HOME}/lib/metron-profiler-spark-${METRON_VERSION}.jar \
--config ${METRON_HOME}/config/batch-profiler.properties \
--profiles ${METRON_HOME}/config/zookeeper/profiler.json
```
1. You may want to edit the log4j properties that sits in your config
directory in $SPARK_HOME, or create one.
```
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss}
%p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the
spark-shell, the
# log level for this class is used to overwrite the root logger's log
level, so that
# the user can have different defaults for the shell and regular Spark
apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up
nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
#log4j.logger.org.apache.metron.profiler=DEBUG
log4j.logger.org.apache.metron.profiler.spark=DEBUG
```
1. You should see something like the following.
```
[root@node1 conf]# $METRON_HOME/bin/start_batch_profiler.sh
SPARK_MAJOR_VERSION is set to 2, using Spark2
Warning: Ignoring non-spark config property:
profiler.batch.input.path=hdfs://localhost:8020/apps/metron/indexing/indexed/*/*
Warning: Ignoring non-spark config property: profiler.period.duration=1
Warning: Ignoring non-spark config property:
profiler.period.duration.units=MINUTES
18/08/01 19:15:17 WARN Utils: Your hostname, node1 resolves to a
loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
18/08/01 19:15:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
18/08/01 19:15:18 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
18/08/01 19:15:18 INFO BatchProfilerCLI: Loading profiler properties
from '/usr/metron/0.5.1/config/batch-profiler.properties'
18/08/01 19:15:18 INFO BatchProfilerCLI: Properties =
{spark.app.name=Batch Profiler,
profiler.batch.input.path=hdfs://localhost:8020/apps/metron/indexing/indexed/*/*,
profiler.period.duration.units=MINUTES, profiler.period.duration=1,
spark.sql.shuffle.partitions=8, spark.master=local}
18/08/01 19:15:18 INFO BatchProfilerCLI: Loading profiles from
'/usr/metron/0.5.1/config/zookeeper/profiler.json'
18/08/01 19:15:18 INFO BatchProfilerCLI: Loaded 1 profile(s)
18/08/01 19:15:19 DEBUG BatchProfiler: Building 1 profile(s)
18/08/01 19:15:19 DEBUG BatchProfiler: Loading telemetry from
'hdfs://localhost:8020/apps/metron/indexing/indexed/*/*'
18/08/01 19:15:21 WARN DomainSocketFactory: The short-circuit local
reads feature cannot be used because libhadoop cannot be loaded.
18/08/01 19:15:26 DEBUG BatchProfiler: Found 8130 telemetry record(s)
ANTLR Tool version 4.5 used for code generation does not match the
current runtime version 4.7ANTLR Runtime version 4.5 used for parser
compilation does not match the current runtime version 4.7ANTLR Tool version
4.5 used for code generation does not match the current runtime version
4.7ANTLR Runtime version 4.5 used for parser compilation does not match the
current runtime version 4.718/08/01 19:15:28 DEBUG BatchProfiler: Generated
8130 message route(s)
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552359' from 320 message(s)
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552359, value=320
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552363' from 210 message(s)
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552363, value=210
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552365' from 260 message(s)
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552365, value=260
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552367' from 250 message(s)
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552367, value=250
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552369' from 280 message(s)
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552369, value=280
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552375' from 270 message(s)
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552375, value=270
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552374' from 240 message(s)
18/08/01 19:15:29 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552374, value=240
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552378' from 320 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552378, value=320
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552355' from 340 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552355, value=340
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552361' from 280 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552361, value=280
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552372' from 250 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552372, value=250
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552373' from 280 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552373, value=280
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552379' from 320 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552379, value=320
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552352' from 180 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552352, value=180
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552353' from 240 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552353, value=240
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552360' from 320 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552360, value=320
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552362' from 290 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552362, value=290
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552376' from 310 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552376, value=310
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552357' from 330 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552357, value=330
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552381' from 130 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552381, value=130
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552356' from 320 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552356, value=320
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552366' from 260 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552366, value=260
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552370' from 260 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552370, value=260
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552368' from 280 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552368, value=280
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552377' from 230 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552377, value=230
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552380' from 290 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552380, value=290
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552354' from 310 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552354, value=310
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552358' from 330 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552358, value=330
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552364' from 220 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552364, value=220
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Building a profile for
group 'hello-world-global-25552371' from 210 message(s)
18/08/01 19:15:30 DEBUG ProfileBuilderFunction: Profile measurement
created; profile=hello-world, entity=global, period=25552371, value=210
18/08/01 19:15:30 DEBUG BatchProfiler: Produced 30 profile
measurement(s)
18/08/01 19:15:31 DEBUG HBaseWriterFunction: About to write profile
measurement(s) to HBase
18/08/01 19:15:32 DEBUG HBaseWriterFunction: 6 profile measurement(s)
written to HBase
18/08/01 19:15:32 DEBUG HBaseWriterFunction: About to write profile
measurement(s) to HBase
18/08/01 19:15:32 DEBUG HBaseWriterFunction: 2 profile measurement(s)
written to HBase
18/08/01 19:15:32 DEBUG HBaseWriterFunction: About to write profile
measurement(s) to HBase
18/08/01 19:15:32 DEBUG HBaseWriterFunction: 5 profile measurement(s)
written to HBase
18/08/01 19:15:32 DEBUG HBaseWriterFunction: About to write profile
measurement(s) to HBase
18/08/01 19:15:32 DEBUG HBaseWriterFunction: 5 profile measurement(s)
written to HBase
18/08/01 19:15:32 DEBUG HBaseWriterFunction: About to write profile
measurement(s) to HBase
18/08/01 19:15:33 DEBUG HBaseWriterFunction: 2 profile measurement(s)
written to HBase
18/08/01 19:15:33 DEBUG HBaseWriterFunction: About to write profile
measurement(s) to HBase
18/08/01 19:15:33 DEBUG HBaseWriterFunction: 3 profile measurement(s)
written to HBase
18/08/01 19:15:33 DEBUG HBaseWriterFunction: About to write profile
measurement(s) to HBase
18/08/01 19:15:33 DEBUG HBaseWriterFunction: 3 profile measurement(s)
written to HBase
18/08/01 19:15:33 DEBUG HBaseWriterFunction: About to write profile
measurement(s) to HBase
18/08/01 19:15:33 DEBUG HBaseWriterFunction: 4 profile measurement(s)
written to HBase
18/08/01 19:15:33 DEBUG BatchProfiler: 30 profile measurement(s)
written to HBase
18/08/01 19:15:33 INFO BatchProfilerCLI: Profiler produced 30 profile
measurement(s)
```
1. Fetch the profile measurements created by the Profiler.
The Profiler counted a couple hundred messages each minute.
```
Stellar]>>> m := PROFILE_GET("hello-world","global", PROFILE_FIXED(30,
"DAYS"))
[180, 240, 310, 340, 320, 330, 330, 320, 320, 280, 290, 210, 220, 260,
260, 250, 280, 280, 260, 210, 250, 280, 240, 270, 310, 230, 320, 320, 290, 130]
```
Overall, there were 30 measurements captured from the archived
telemetry.
```
[Stellar]>>> LENGTH(m)
30
```
The Profiler counted a total of 8,130 messages.
```
[Stellar]>>> REDUCE(m, (l,r)->l+r, 0)
8130
```
1. You may need to create the Spark history directory in HDFS if doing this
in Full Dev.
```
export HADOOP_USER_NAME=hdfs
hdfs dfs -mkdir /spark2-history
```
1. Validate the range of time over which we have telemetry.
In the spark shell (`/usr/hdp/current/spark2-client/bin/spark-shell`)
run the following.
```
scala> val msgs =
spark.read.json("hdfs://localhost:8020/apps/metron/indexing/indexed/*/*")
msgs: org.apache.spark.sql.DataFrame = [AA: boolean, RA: boolean ... 91
more fields]
scala> msgs.count()
res15: Long = 8130
scala> msgs.select(max("timestamp") - min("timestamp")).show()
+---------------------------------+
|(max(timestamp) - min(timestamp))|
+---------------------------------+
| 1769980|
+---------------------------------+
```
We see that 1,769,980 milliseconds is about 30 minutes. That matches
the 30 measurements that have been captured by the Profiler.
## Pull Request Checklist
- [x] Have you included steps to reproduce the behavior or problem that is
being changed or addressed?
- [x] Have you included steps or a guide to how the change may be verified
and tested manually?
- [x] Have you ensured that the full suite of tests and checks have been
executed in the root metron folder via:
- [x] Have you written or updated unit tests and or integration tests to
verify your changes?
- [x] If adding new dependencies to the code, are these dependencies
licensed in a way that is compatible for inclusion under [ASF
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [x] Have you verified the basic functionality of the build by building
and running locally with Vagrant full-dev environment or the equivalent?
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/nickwallen/metron METRON-1707
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/metron/pull/1150.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1150
----
commit 6ce28594659928a8c87c57edddd22e1ab00d798d
Author: Nick Allen <nick@...>
Date: 2018-07-10T14:08:48Z
METRON-1703 Make Core Profiler Components Serializable
commit 0051359cbb277881de896526345bb4fce1d5139c
Author: Nick Allen <nick@...>
Date: 2018-07-10T19:42:19Z
METRON-1704 Message Timestamp Logic Should be Shared
commit 2413726bdf96221ec775a9c8de524e3ec92148b7
Author: Nick Allen <nick@...>
Date: 2018-07-27T17:20:15Z
METRON-1706: HbaseClient.mutate should return the number of mutations
commit 21980ca764b98ddb96c4c8732e0ef7a6c5ea2c56
Author: Nick Allen <nick@...>
Date: 2018-07-24T18:02:36Z
METRON-1705 Create ProfilePeriod Using Period ID
commit be15126419a2862864a7acd67349281b086f52cf
Author: Nick Allen <nick@...>
Date: 2018-07-31T19:26:20Z
METRON-1707 Port Profiler to Spark
----
---