Repository: metron
Updated Branches:
  refs/heads/feature/METRON-1699-create-batch-profiler f83f0ac06 -> 1545978e1


METRON-1772 Support alternative input formats in the Batch Profiler 
(nickwallen) closes apache/metron#1191


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1545978e
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1545978e
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1545978e

Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 1545978e169a01e4a06735b8713c8fa65373a394
Parents: f83f0ac
Author: nickwallen <n...@nickallen.org>
Authored: Wed Sep 19 10:11:28 2018 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Wed Sep 19 10:11:28 2018 -0400

----------------------------------------------------------------------
 .../metron-profiler-spark/README.md             | 47 +++++++++-
 metron-analytics/metron-profiler-spark/pom.xml  | 18 ++--
 .../metron/profiler/spark/BatchProfiler.java    | 21 +++--
 .../profiler/spark/cli/BatchProfilerCLI.java    | 40 +++++++--
 .../spark/cli/BatchProfilerCLIOptions.java      | 10 ++-
 .../spark/BatchProfilerIntegrationTest.java     | 91 +++++++++++++++++---
 6 files changed, 189 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/README.md 
b/metron-analytics/metron-profiler-spark/README.md
index 3d7017c..99e8c7e 100644
--- a/metron-analytics/metron-profiler-spark/README.md
+++ b/metron-analytics/metron-profiler-spark/README.md
@@ -131,6 +131,14 @@ The Batch Profiler requires Spark version 2.3.0+.
 
 ## Running the Profiler
 
+* [Usage](#usage)
+* [Advanced Usage](#advanced-usage)
+* [Spark Execution](#spark-execution)
+* [Kerberos](#kerberos)
+* [Input Formats](#input-formats)
+
+### Usage
+
 A script located at `$METRON_HOME/bin/start_batch_profiler.sh` has been 
provided to simplify running the Batch Profiler.  This script makes the 
following assumptions.
 
   * The script builds the profiles defined in 
`$METRON_HOME/config/zookeeper/profiler.json`.
@@ -156,11 +164,28 @@ The Batch Profiler accepts the following arguments when 
run from the command lin
 
 | Argument         | Description
 |---               |---
-| -p, --profiles   | The path to a file containing the profile definitions.
-| -c, --config     | The path to the profiler properties file.
-| -g, --globals    | The path to a properties file containing global 
properties.
+| -p, --profiles   | Path to the profile definitions.
+| -c, --config     | Path to the profiler properties file.
+| -g, --globals    | Path to the Stellar global config file.
+| -r, --reader     | Path to properties for the DataFrameReader.
 | -h, --help       | Print the help text.
 
+#### `--profiles`
+
+The path to a file containing the profile definition in JSON.
+
+#### `--config`
+
+The path to a file containing key-value properties for the Profiler. This file 
would contain the properties described under [Configuring the 
Profiler](#configuring-the-profiler).
+
+#### `--globals`
+
+The path to a file containing key-value properties that define the global 
properties. This can be used to customize how certain Stellar functions behave 
during execution.
+
+#### `--reader`
+
+The path to a file containing key-value properties that are passed to the 
DataFrameReader when reading the input telemetry. This allows additional 
customization for how the input telemetry is read.
+
 ### Spark Execution
 
 Spark supports a number of different [cluster 
managers](https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types).
  The underlying cluster manager is transparent to the Profiler.  To run the 
Profiler on a particular cluster manager, it is just a matter of setting the 
appropriate options as defined in the Spark documentation.
@@ -191,10 +216,24 @@ The following command can be useful to review the logs 
generated when the Profil
 yarn logs -applicationId <application-id>
 ```
 
-#### Kerberos
+### Kerberos
 
 See the Spark documentation for information on running the Batch Profiler in a 
[secure, kerberized 
cluster](https://spark.apache.org/docs/latest/running-on-yarn.html#running-in-a-secure-cluster).
 
+### Input Formats
+
+The Profiler can consume archived telemetry stored in a variety of input 
formats.  By default, it is configured to consume the text/json that Metron 
archives in HDFS. This is often not the best format for archiving telemetry.  
If you choose a different format, you should be able to configure the Profiler 
to consume it by doing the following.
+
+1. Edit [`profiler.batch.input.format`](#profilerbatchinputformat) and 
[`profiler.batch.input.path`](#profilerbatchinputpath) as needed.  For example, 
to read ORC you might do the following.
+
+  `$METRON_HOME/config/batch-profiler.properties`
+  ```
+  profiler.batch.input.format=org.apache.spark.sql.execution.datasources.orc
+  
profiler.batch.input.path=hdfs://localhost:9000/apps/metron/indexing/orc/\*/\*
+  ```
+
+1. If additional options are required for your input format, then use the 
[`--reader`](#--reader) command-line argument when launching the Batch Profiler 
as [described here](#advanced-usage).
+
 
 ## Configuring the Profiler
 

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/pom.xml 
b/metron-analytics/metron-profiler-spark/pom.xml
index 587b38c..668ee2c 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -25,6 +25,7 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <spark_antlr_version>4.7</spark_antlr_version>
     </properties>
     <dependencies>
         <dependency>
@@ -36,12 +37,11 @@
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.11</artifactId>
             <version>${global_spark_version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.antlr</groupId>
-                    <artifactId>antlr-runtime</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr4-runtime</artifactId>
+            <version>${spark_antlr_version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
@@ -53,6 +53,12 @@
             <artifactId>metron-profiler-client</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.antlr</groupId>
+                    <artifactId>antlr4-runtime</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index f999613..d75abc3 100644
--- 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -54,25 +55,29 @@ public class BatchProfiler implements Serializable {
    * Execute the Batch Profiler.
    *
    * @param spark The spark session.
-   * @param properties The profiler configuration properties.
+   * @param profilerProps The profiler configuration properties.
+   * @param globalProperties The Stellar global properties.
+   * @param readerProps The properties passed to the {@link 
org.apache.spark.sql.DataFrameReader}.
    * @param profiles The profile definitions.
    * @return The number of profile measurements produced.
    */
   public long run(SparkSession spark,
-                  Properties properties,
+                  Properties profilerProps,
                   Properties globalProperties,
+                  Properties readerProps,
                   ProfilerConfig profiles) {
 
     LOG.debug("Building {} profile(s)", profiles.getProfiles().size());
     Map<String, String> globals = Maps.fromProperties(globalProperties);
 
-    String inputFormat = TELEMETRY_INPUT_FORMAT.get(properties, String.class);
-    String inputPath = TELEMETRY_INPUT_PATH.get(properties, String.class);
+    String inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, 
String.class);
+    String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
     LOG.debug("Loading telemetry from '{}'", inputPath);
 
     // fetch the archived telemetry
     Dataset<String> telemetry = spark
             .read()
+            .options(Maps.fromProperties(readerProps))
             .format(inputFormat)
             .load(inputPath)
             .as(Encoders.STRING());
@@ -85,13 +90,13 @@ public class BatchProfiler implements Serializable {
 
     // build the profiles
     Dataset<ProfileMeasurementAdapter> measurements = routes
-            .groupByKey(new GroupByPeriodFunction(properties), 
Encoders.STRING())
-            .mapGroups(new ProfileBuilderFunction(properties, globals), 
Encoders.bean(ProfileMeasurementAdapter.class));
+            .groupByKey(new GroupByPeriodFunction(profilerProps), 
Encoders.STRING())
+            .mapGroups(new ProfileBuilderFunction(profilerProps, globals), 
Encoders.bean(ProfileMeasurementAdapter.class));
     LOG.debug("Produced {} profile measurement(s)", 
measurements.cache().count());
 
     // write the profile measurements to HBase
     long count = measurements
-            .mapPartitions(new HBaseWriterFunction(properties), Encoders.INT())
+            .mapPartitions(new HBaseWriterFunction(profilerProps), 
Encoders.INT())
             .agg(sum("value"))
             .head()
             .getLong(0);
@@ -99,4 +104,4 @@ public class BatchProfiler implements Serializable {
 
     return count;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
index bdcf231..29fe4a2 100644
--- 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
+++ 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
@@ -37,9 +37,10 @@ import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.Properties;
 
-import static 
org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.CONFIGURATION_FILE;
+import static 
org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE;
 import static 
org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.GLOBALS_FILE;
 import static 
org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_DEFN_FILE;
+import static 
org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.READER_PROPS_FILE;
 import static 
org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse;
 
 /**
@@ -54,7 +55,8 @@ import static 
org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse
  *     metron-profiler-spark-<version>.jar \
  *     --config profiler.properties \
  *     --globals global.properties \
- *     --profiles profiles.json
+ *     --profiles profiles.json \
+ *     --reader reader.properties
  * }</pre>
  */
 public class BatchProfilerCLI implements Serializable {
@@ -63,6 +65,7 @@ public class BatchProfilerCLI implements Serializable {
 
   public static Properties globals;
   public static Properties profilerProps;
+  public static Properties readerProps;
   public static ProfilerConfig profiles;
 
   public static void main(String[] args) throws IOException, 
org.apache.commons.cli.ParseException {
@@ -71,6 +74,7 @@ public class BatchProfilerCLI implements Serializable {
     profilerProps = handleProfilerProperties(commandLine);
     globals = handleGlobals(commandLine);
     profiles = handleProfileDefinitions(commandLine);
+    readerProps = handleReaderProperties(commandLine);
 
     // the batch profiler must use 'event time'
     if(!profiles.getTimestampField().isPresent()) {
@@ -88,7 +92,7 @@ public class BatchProfilerCLI implements Serializable {
             .getOrCreate();
 
     BatchProfiler profiler = new BatchProfiler();
-    long count = profiler.run(spark, profilerProps, globals, profiles);
+    long count = profiler.run(spark, profilerProps, globals, readerProps, 
profiles);
     LOG.info("Profiler produced {} profile measurement(s)", count);
   }
 
@@ -117,13 +121,31 @@ public class BatchProfilerCLI implements Serializable {
    */
   private static Properties handleProfilerProperties(CommandLine commandLine) 
throws IOException {
     Properties config = new Properties();
-    if(CONFIGURATION_FILE.has(commandLine)) {
-      String propertiesPath = CONFIGURATION_FILE.get(commandLine);
+    if(PROFILER_PROPS_FILE.has(commandLine)) {
+      String propertiesPath = PROFILER_PROPS_FILE.get(commandLine);
 
       LOG.info("Loading profiler properties from '{}'", propertiesPath);
       config.load(new FileInputStream(propertiesPath));
 
-      LOG.info("Properties = {}", config.toString());
+      LOG.info("Profiler properties = {}", config.toString());
+    }
+    return config;
+  }
+
+  /**
+   * Load the properties for the {@link org.apache.spark.sql.DataFrameReader}.
+   *
+   * @param commandLine The command line.
+   */
+  private static Properties handleReaderProperties(CommandLine commandLine) 
throws IOException {
+    Properties config = new Properties();
+    if(READER_PROPS_FILE.has(commandLine)) {
+      String readerPropsPath = READER_PROPS_FILE.get(commandLine);
+
+      LOG.info("Loading reader properties from '{}'", readerPropsPath);
+      config.load(new FileInputStream(readerPropsPath));
+
+      LOG.info("Reader properties = {}", config.toString());
     }
     return config;
   }
@@ -171,4 +193,8 @@ public class BatchProfilerCLI implements Serializable {
   public static ProfilerConfig getProfiles() {
     return profiles;
   }
-}
\ No newline at end of file
+
+  public static Properties getReaderProps() {
+    return readerProps;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
index f5dfe12..d58728a 100644
--- 
a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
+++ 
b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
@@ -36,12 +36,12 @@ import java.util.function.Supplier;
 public enum BatchProfilerCLIOptions {
 
   PROFILE_DEFN_FILE(() -> {
-    Option o = new Option("p", "profiles", true, "Path to a file containing 
profile definitions.");
+    Option o = new Option("p", "profiles", true, "Path to the profile 
definitions.");
     o.setRequired(true);
     return o;
   }),
 
-  CONFIGURATION_FILE(() -> {
+  PROFILER_PROPS_FILE(() -> {
     Option o = new Option("c", "config", true, "Path to the profiler 
properties file.");
     o.setRequired(false);
     return o;
@@ -53,6 +53,12 @@ public enum BatchProfilerCLIOptions {
     return o;
   }),
 
+  READER_PROPS_FILE(() -> {
+    Option o = new Option("r", "reader", true, "Path to properties for the 
DataFrameReader.");
+    o.setRequired(false);
+    return o;
+  }),
+
   HELP(() -> {
     Option o = new Option("h", "help", false, "Usage instructions.");
     o.setRequired(false);

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
 
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index 376623c..87c4246 100644
--- 
a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ 
b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -30,11 +30,14 @@ import 
org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.SparkSession;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -82,8 +85,12 @@ public class BatchProfilerIntegrationTest {
   private static String profileJson;
   private static SparkSession spark;
   private Properties profilerProperties;
+  private Properties readerProperties;
   private StellarStatefulExecutor executor;
 
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
   @BeforeClass
   public static void setupSpark() {
     SparkConf conf = new SparkConf()
@@ -105,12 +112,9 @@ public class BatchProfilerIntegrationTest {
 
   @Before
   public void setup() {
+    readerProperties = new Properties();
     profilerProperties = new Properties();
 
-    // the input telemetry is read from the local filesystem
-    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), 
"src/test/resources/telemetry.json");
-    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
-
     // the output will be written to a mock HBase table
     String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
     String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, 
String.class);
@@ -147,15 +151,80 @@ public class BatchProfilerIntegrationTest {
    * produced will center around this date.
    */
   @Test
-  public void testBatchProfiler() throws Exception {
-    // run the batch profiler
+  public void testBatchProfilerWithJSON() throws Exception {
+    // the input telemetry is text/json stored in the local filesystem
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), 
"src/test/resources/telemetry.json");
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
+
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, 
getProfile());
+
+    validateProfiles();
+  }
+
+  @Test
+  public void testBatchProfilerWithORC() throws Exception {
+    // re-write the test data as ORC
+    String pathToORC = tempFolder.getRoot().getAbsolutePath();
+    spark.read()
+            .format("text")
+            .load("src/test/resources/telemetry.json")
+            .as(Encoders.STRING())
+            .write()
+            .mode("overwrite")
+            .format("org.apache.spark.sql.execution.datasources.orc")
+            .save(pathToORC);
+
+    // tell the profiler to use the ORC input data
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToORC);
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), 
"org.apache.spark.sql.execution.datasources.orc");
+
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, 
getProfile());
+
+    validateProfiles();
+  }
+
+  @Test
+  public void testBatchProfilerWithCSV() throws Exception {
+    // re-write the test data as a CSV with a header record
+    String pathToCSV = tempFolder.getRoot().getAbsolutePath();
+    spark.read()
+            .format("text")
+            .load("src/test/resources/telemetry.json")
+            .as(Encoders.STRING())
+            .write()
+            .mode("overwrite")
+            .option("header", "true")
+            .format("csv")
+            .save(pathToCSV);
+
+    // tell the profiler to use the CSV input data
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToCSV);
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "csv");
+
+    // set a reader property; tell the reader to expect a header
+    readerProperties.put("header", "true");
+
     BatchProfiler profiler = new BatchProfiler();
-    profiler.run(spark, profilerProperties, getGlobals(), getProfile());
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, 
getProfile());
+
+    validateProfiles();
+  }
+
+  /**
+   * Validates the profiles that were built.
+   *
+   * These tests use the Batch Profiler to seed two profiles with archived 
telemetry.  The first profile
+   * called 'count-by-ip', counts the number of messages by 'ip_src_addr'.  
The second profile called
+   * 'total-count', counts the total number of messages.
+   */
+  private void validateProfiles() {
+    // the max timestamp in the data is around July 7, 2018
+    assign("maxTimestamp", "1530978728982L");
 
-    // validate the measurements written by the batch profiler using 
`PROFILE_GET`
-    // the 'window' looks up to 5 hours before the last timestamp contained in 
the telemetry
-    assign("lastTimestamp", "1530978728982L");
-    assign("window", "PROFILE_WINDOW('from 5 hours ago', lastTimestamp)");
+    // the 'window' looks up to 5 hours before the max timestamp
+    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
 
     // there are 26 messages where ip_src_addr = 192.168.66.1
     assertTrue(execute("[26] == PROFILE_GET('count-by-ip', '192.168.66.1', 
window)", Boolean.class));

Reply via email to