GitHub user nickwallen reopened a pull request:
https://github.com/apache/metron/pull/1229
METRON-1809 Support Column Oriented Input with Batch Profiler
The Batch Profiler currently only accepts input formats that can be
directly serialized to JSON. This PR enhances that support to accept a wider
variety of input formats, most notably columnar formats like ORC and Parquet.
My first attempt at doing this in #1191 didn't go far enough. The tests I
had written only tested reading a single text column from ORC, which is not how
someone would store the data in ORC. Each field of the message would be stored
in a separate column. The test I had written failed to test this difference.
## Changes
* This introduces the `TelemetryReader` interface that handles differences
between input formats.
* This provides two different `TelemetryReader` implementations. One that
handles textual input that can be directly serialized; like JSON or CSV.
Another that handles column oriented input like ORC and Parquet.
* A new property was introduced to make this configurable to allow the user
to define the `TelemetryReader` implementation.
* The README was updated to define the new property and provide examples
for common formats.
* Unit and integration tests were added or updated to test the new
functionality.
## Test Plan
1. Stand-up a development environment.
```
cd metron-deployment/development/centos6
vagrant up
vagrant ssh
sudo su -
```
1. Validate the environment by ensuring alerts are visible within the
Alerts UI and that the Metron Service Check in Ambari passes.
1. Allow some telemetry to be archived in HDFS.
```
[root@node1 ~]# hdfs dfs -cat /apps/metron/indexing/indexed/*/* | wc -l
6916
```
1. Shutdown Metron topologies, Storm, Elasticsearch, Kibana, MapReduce2 to
free up some resources on the VM.
1. Use Ambari to install Spark (version 2.3+). Actions > Add Service >
Spark2
1. Make sure Spark can talk to HBase.
```
SPARK_HOME=/usr/hdp/current/spark2-client
cp /usr/hdp/current/hbase-client/conf/hbase-site.xml $SPARK_HOME/conf/
```
1. Follow the Getting Started section of the README to seed a basic profile
using the text/json telemetry that is archived in HDFS.
1. Create the Profile.
```
[root@node1 ~]# source /etc/default/metron
[root@node1 ~]# cat $METRON_HOME/config/zookeeper/profiler.json
{
"profiles": [
{
"profile": "hello-world",
"foreach": "'global'",
"init": { "count": "0" },
"update": { "count": "count + 1" },
"result": "count"
}
],
"timestampField": "timestamp"
}
```
1. Edit the Batch Profiler properties. to point it at the correct input
path (changed localhost:9000 to localhost:8020).
```
[root@node1 ~]# cat
/usr/metron/0.5.1/config/batch-profiler.properties
spark.app.name=Batch Profiler
spark.master=local
spark.sql.shuffle.partitions=8
profiler.batch.input.path=hdfs://localhost:8020/apps/metron/indexing/indexed/*/*
profiler.batch.input.format=text
profiler.period.duration=15
profiler.period.duration.units=MINUTES
```
1. Edit logging as you see fit. For example, set Spark logging to WARN
and Profiler logging to DEBUG. This is described in the README.
1. Run the Batch Profiler.
```
$METRON_HOME/bin/start_batch_profiler.sh
```
1. Launch the Stellar REPL and retrieve the profile data. Save this result
as it will be used for validation in subsequent steps.
```
[root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER
...
Stellar, Go!
Functions are loading lazily in the background and will be unavailable
until loaded fully.
...
[Stellar]>>> window := PROFILE_FIXED(2, "HOURS")
[ProfilePeriod{period=1707332, durationMillis=900000},
ProfilePeriod{period=1707333, durationMillis=900000},
ProfilePeriod{period=1707334, durationMillis=900000},
ProfilePeriod{period=1707335, durationMillis=900000},
ProfilePeriod{period=1707336, durationMillis=900000},
ProfilePeriod{period=1707337, durationMillis=900000},
ProfilePeriod{period=1707338, durationMillis=900000},
ProfilePeriod{period=1707339, durationMillis=900000},
ProfilePeriod{period=1707340, durationMillis=900000}]
[Stellar]>>> PROFILE_GET("hello-world","global", window)
[1020, 5066, 830]
```
1. Delete the profiler data.
```
echo "truncate 'profiler'" | hbase shell
```
1. Create a new directory in HDFS for the ORC data that we are about to
generate.
```
export HADOOP_USER_NAME=hdfs
hdfs dfs -mkdir /apps/metron/indexing/orc
hdfs dfs -chown metron:hadoop /apps/metron/indexing/orc
```
1. You may need to also create this directory for Spark.
```
export HADOOP_USER_NAME=hdfs
hdfs dfs -mkdir /spark2-history
```
1. Launch the Spark shell
```
export SPARK_MAJOR_VERSION=2
export HADOOP_USER_NAME=hdfs
spark-shell
```
1. Use the Spark Shell to transform the text/json telemetry to ORC.
```
scala> val jsonPath =
"hdfs://localhost:8020/apps/metron/indexing/indexed/*/*"
jsonPath: String =
hdfs://localhost:8020/apps/metron/indexing/indexed/*/*
scala> val orcPath = "hdfs://localhost:8020/apps/metron/orc/"
orcPath: String = hdfs://localhost:8020/apps/metron/orc/
scala> val msgs = spark.read.format("json").load(jsonPath).as[String]
msgs: org.apache.spark.sql.Dataset[String] = [value: string]
scala> msgs.count
res0: Long = 6916
scala>
msgs.write.mode("overwrite").format("org.apache.spark.sql.execution.datasources.orc").save(orcPath)
scala>
spark.read.format("org.apache.spark.sql.execution.datasources.orc").load(orcPath).as[String].count
res3: Long = 6916
```
1. Edit `$METRON_HOME/config/batch-profiler.properties` so that the Batch
Profiler consumes that telemetry stored as ORC.
```
[root@node1 ~]# cat /usr/metron/0.5.1/config/batch-profiler.properties
spark.app.name=Batch Profiler
spark.master=local
spark.sql.shuffle.partitions=8
profiler.batch.input.path=hdfs://localhost:8020/apps/metron/orc/
profiler.batch.input.format=org.apache.spark.sql.execution.datasources.orc
profiler.period.duration=15
profiler.period.duration.units=MINUTES
```
1. Again, run the Batch Profiler again. It will now consume the ORC data.
```
$METRON_HOME/bin/start_batch_profiler.sh
```
1. Again, launch the Stellar REPL and retrieve the profile data. The data
should match the previous profile data that was generated using the test/json
telemetry.
```
[root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER
...
Stellar, Go!
Functions are loading lazily in the background and will be unavailable
until loaded fully.
...
[Stellar]>>> window := PROFILE_FIXED(2, "HOURS")
[ProfilePeriod{period=1707332, durationMillis=900000},
ProfilePeriod{period=1707333, durationMillis=900000},
ProfilePeriod{period=1707334, durationMillis=900000},
ProfilePeriod{period=1707335, durationMillis=900000},
ProfilePeriod{period=1707336, durationMillis=900000},
ProfilePeriod{period=1707337, durationMillis=900000},
ProfilePeriod{period=1707338, durationMillis=900000},
ProfilePeriod{period=1707339, durationMillis=900000},
ProfilePeriod{period=1707340, durationMillis=900000}]
[Stellar]>>> PROFILE_GET("hello-world","global", window)
[1020, 5066, 830]
```
1. Notice that the output is exactly the same no matter which input format
we have used.
## Pull Request Checklist
- [x] Is there a JIRA ticket associated with this PR? If not one needs to
be created at [Metron
Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
- [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA
number you are trying to resolve? Pay particular attention to the hyphen "-"
character.
- [x] Has your PR been rebased against the latest commit within the target
branch (typically master)?
- [x] Have you included steps to reproduce the behavior or problem that is
being changed or addressed?
- [x] Have you included steps or a guide to how the change may be verified
and tested manually?
- [x] Have you ensured that the full suite of tests and checks have been
executed in the root metron folder via:
- [x] Have you written or updated unit tests and or integration tests to
verify your changes?
- [x] If adding new dependencies to the code, are these dependencies
licensed in a way that is compatible for inclusion under [ASF
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] Have you verified the basic functionality of the build by building
and running locally with Vagrant full-dev environment or the equivalent?
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/nickwallen/metron PROFILER-COLUMN-READER
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/metron/pull/1229.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1229
----
commit 41f6677d0a32ea7f43e346e93647b04eebcecce9
Author: Nick Allen <nick@...>
Date: 2018-10-05T19:50:19Z
Initial pass at a column reader for Batch Profiler
commit bd013528f065d968bf3bdecdb094443982a54e46
Author: Nick Allen <nick@...>
Date: 2018-10-05T20:48:32Z
Finished up
commit 65e5a25c41030eaf74748c5a7e4def3584f0ea1f
Author: Nick Allen <nick@...>
Date: 2018-10-05T20:58:04Z
Improved README
commit f2f8fd497cdc1ce3f5788713f9d73a65c2992288
Author: Nick Allen <nick@...>
Date: 2018-10-05T21:00:49Z
Removed comment
commit 61e4c218fdefe5bfd475e10c2afc5405bdd64ded
Author: Nick Allen <nick@...>
Date: 2018-10-06T15:21:36Z
Added license header
commit 9899d4d1328336c073ca542a3045309a76fb1b64
Author: Nick Allen <nick@...>
Date: 2018-10-06T15:22:39Z
Merge remote-tracking branch 'apache/master' into PROFILER-COLUMN-READER
----
---