GitHub user nickwallen reopened a pull request:

    https://github.com/apache/metron/pull/1229

    METRON-1809 Support Column Oriented Input with Batch Profiler

    The Batch Profiler currently only accepts input formats that can be 
directly serialized to JSON.  This PR enhances that support to accept a wider 
variety of input formats, most notably columnar formats  like ORC and Parquet.
    
    My first attempt at doing this in #1191 didn't go far enough.  The tests I 
had written only tested reading a single text column from ORC, which is not how 
someone would store the data in ORC.  Each field of the message would be stored 
in a separate column.  The test I had written failed to test this difference.
    
    ## Changes
    
    * This introduces the `TelemetryReader` interface that handles differences 
between input formats. 
    
    * This provides two different `TelemetryReader` implementations.  One that 
handles textual input that can be directly serialized; like JSON or CSV.  
Another that handles column oriented input like ORC and Parquet.
    
    * A new property was introduced to make this configurable to allow the user 
to define the `TelemetryReader` implementation.
    
    * The README was updated to define the new property and provide examples 
for common formats.
    
    * Unit and integration tests were added or updated to test the new 
functionality.
    
    ## Test Plan
    
    1. Stand-up a development environment.
    
        ```
        cd metron-deployment/development/centos6
        vagrant up
        vagrant ssh
        sudo su -
        ```
    
    1. Validate the environment by ensuring alerts are visible within the 
Alerts UI and that the Metron Service Check in Ambari passes.
    
    1. Allow some telemetry to be archived in HDFS.
        ```
        [root@node1 ~]# hdfs dfs -cat /apps/metron/indexing/indexed/*/* | wc -l
        6916
        ```
    
    1. Shutdown Metron topologies, Storm, Elasticsearch, Kibana, MapReduce2 to 
free up some resources on the VM.
    
    1. Use Ambari to install Spark (version 2.3+).  Actions > Add Service > 
Spark2
    
    1. Make sure Spark can talk to HBase.
    
        ```
        SPARK_HOME=/usr/hdp/current/spark2-client
        cp  /usr/hdp/current/hbase-client/conf/hbase-site.xml $SPARK_HOME/conf/
        ```
    
    1. Follow the Getting Started section of the README to seed a basic profile 
using the text/json telemetry that is archived in HDFS.
    
        1. Create the Profile.
    
            ```
            [root@node1 ~]# source /etc/default/metron
            [root@node1 ~]# cat $METRON_HOME/config/zookeeper/profiler.json
            {
              "profiles": [
                {
                  "profile": "hello-world",
                  "foreach": "'global'",
                  "init":    { "count": "0" },
                  "update":  { "count": "count + 1" },
                  "result":  "count"
                }
              ],
              "timestampField": "timestamp"
            }
            ```
    
        1. Edit the Batch Profiler properties. to point it at the correct input 
path (changed localhost:9000 to localhost:8020).
            ```
            [root@node1 ~]# cat 
/usr/metron/0.5.1/config/batch-profiler.properties
    
            spark.app.name=Batch Profiler
            spark.master=local
            spark.sql.shuffle.partitions=8
    
            
profiler.batch.input.path=hdfs://localhost:8020/apps/metron/indexing/indexed/*/*
            profiler.batch.input.format=text
    
            profiler.period.duration=15
            profiler.period.duration.units=MINUTES
            ```
    
        1. Edit logging as you see fit.  For example, set Spark logging to WARN 
and Profiler logging to DEBUG. This is described in the README.
    
        1. Run the Batch Profiler.
            ```
            $METRON_HOME/bin/start_batch_profiler.sh
            ```
    
    1. Launch the Stellar REPL and retrieve the profile data.  Save this result 
as it will be used for validation in subsequent steps.
        ```
        [root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER
        ...
        Stellar, Go!
        Functions are loading lazily in the background and will be unavailable 
until loaded fully.
        ...
        [Stellar]>>> window := PROFILE_FIXED(2, "HOURS")
        [ProfilePeriod{period=1707332, durationMillis=900000}, 
ProfilePeriod{period=1707333, durationMillis=900000}, 
ProfilePeriod{period=1707334, durationMillis=900000}, 
ProfilePeriod{period=1707335, durationMillis=900000}, 
ProfilePeriod{period=1707336, durationMillis=900000}, 
ProfilePeriod{period=1707337, durationMillis=900000}, 
ProfilePeriod{period=1707338, durationMillis=900000}, 
ProfilePeriod{period=1707339, durationMillis=900000}, 
ProfilePeriod{period=1707340, durationMillis=900000}]
    
        [Stellar]>>> PROFILE_GET("hello-world","global", window)
        [1020, 5066, 830]
        ```
    
    1. Delete the profiler data.
        ```
        echo "truncate 'profiler'" |  hbase shell
        ```
    
    1. Create a new directory in HDFS for the ORC data that we are about to 
generate.
        ```
        export HADOOP_USER_NAME=hdfs
        hdfs dfs -mkdir /apps/metron/indexing/orc
        hdfs dfs -chown metron:hadoop /apps/metron/indexing/orc
        ```
    
    1. You may need to also create this directory for Spark.  
       ```
        export HADOOP_USER_NAME=hdfs
        hdfs dfs -mkdir /spark2-history
       ```
    
    1. Launch the Spark shell
        ```
        export SPARK_MAJOR_VERSION=2
        export HADOOP_USER_NAME=hdfs
        spark-shell
        ```
    
    1. Use the Spark Shell to transform the text/json telemetry to ORC.
        ```
        scala> val jsonPath = 
"hdfs://localhost:8020/apps/metron/indexing/indexed/*/*"
        jsonPath: String = 
hdfs://localhost:8020/apps/metron/indexing/indexed/*/*
    
        scala> val orcPath = "hdfs://localhost:8020/apps/metron/orc/"
        orcPath: String = hdfs://localhost:8020/apps/metron/orc/
    
        scala> val msgs = spark.read.format("json").load(jsonPath).as[String]
        msgs: org.apache.spark.sql.Dataset[String] = [value: string]
    
        scala> msgs.count
        res0: Long = 6916
    
        scala> 
msgs.write.mode("overwrite").format("org.apache.spark.sql.execution.datasources.orc").save(orcPath)
    
        scala> 
spark.read.format("org.apache.spark.sql.execution.datasources.orc").load(orcPath).as[String].count
        res3: Long = 6916
        ```
    
    1. Edit `$METRON_HOME/config/batch-profiler.properties` so that the Batch 
Profiler consumes that telemetry stored as ORC.
        ```
        [root@node1 ~]# cat /usr/metron/0.5.1/config/batch-profiler.properties
    
        spark.app.name=Batch Profiler
        spark.master=local
        spark.sql.shuffle.partitions=8
    
        profiler.batch.input.path=hdfs://localhost:8020/apps/metron/orc/
        
profiler.batch.input.format=org.apache.spark.sql.execution.datasources.orc
    
        profiler.period.duration=15
        profiler.period.duration.units=MINUTES
        ```
    
    1. Again, run the Batch Profiler again.  It will now consume the ORC data. 
        ```
        $METRON_HOME/bin/start_batch_profiler.sh
        ```
    
    1. Again, launch the Stellar REPL and retrieve the profile data.  The data 
should match the previous profile data that was generated using the test/json 
telemetry.
        ```
        [root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER
        ...
        Stellar, Go!
        Functions are loading lazily in the background and will be unavailable 
until loaded fully.
        ...
        [Stellar]>>> window := PROFILE_FIXED(2, "HOURS")
        [ProfilePeriod{period=1707332, durationMillis=900000}, 
ProfilePeriod{period=1707333, durationMillis=900000}, 
ProfilePeriod{period=1707334, durationMillis=900000}, 
ProfilePeriod{period=1707335, durationMillis=900000}, 
ProfilePeriod{period=1707336, durationMillis=900000}, 
ProfilePeriod{period=1707337, durationMillis=900000}, 
ProfilePeriod{period=1707338, durationMillis=900000}, 
ProfilePeriod{period=1707339, durationMillis=900000}, 
ProfilePeriod{period=1707340, durationMillis=900000}]
    
        [Stellar]>>> PROFILE_GET("hello-world","global", window)
        [1020, 5066, 830]
        ```
    
    1. Notice that the output is exactly the same no matter which input format 
we have used.
    
    
    ## Pull Request Checklist
    
    - [x] Is there a JIRA ticket associated with this PR? If not one needs to 
be created at [Metron 
Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
    - [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
    - [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?
    - [x] Have you included steps to reproduce the behavior or problem that is 
being changed or addressed?
    - [x] Have you included steps or a guide to how the change may be verified 
and tested manually?
    - [x] Have you ensured that the full suite of tests and checks have been 
executed in the root metron folder via:
    - [x] Have you written or updated unit tests and or integration tests to 
verify your changes?
    - [x] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [ ] Have you verified the basic functionality of the build by building 
and running locally with Vagrant full-dev environment or the equivalent?


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nickwallen/metron PROFILER-COLUMN-READER

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/metron/pull/1229.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1229
    
----
commit 41f6677d0a32ea7f43e346e93647b04eebcecce9
Author: Nick Allen <nick@...>
Date:   2018-10-05T19:50:19Z

    Initial pass at a column reader for Batch Profiler

commit bd013528f065d968bf3bdecdb094443982a54e46
Author: Nick Allen <nick@...>
Date:   2018-10-05T20:48:32Z

    Finished up

commit 65e5a25c41030eaf74748c5a7e4def3584f0ea1f
Author: Nick Allen <nick@...>
Date:   2018-10-05T20:58:04Z

    Improved README

commit f2f8fd497cdc1ce3f5788713f9d73a65c2992288
Author: Nick Allen <nick@...>
Date:   2018-10-05T21:00:49Z

    Removed comment

commit 61e4c218fdefe5bfd475e10c2afc5405bdd64ded
Author: Nick Allen <nick@...>
Date:   2018-10-06T15:21:36Z

    Added license header

commit 9899d4d1328336c073ca542a3045309a76fb1b64
Author: Nick Allen <nick@...>
Date:   2018-10-06T15:22:39Z

    Merge remote-tracking branch 'apache/master' into PROFILER-COLUMN-READER

----


---

Reply via email to