Add options support to allow either --input or --database, fixed and added 
tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/579be080
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/579be080
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/579be080

Branch: refs/heads/SPOT-181_ODM
Commit: 579be080f70352fd088a638b5cf09e6f52fb8890
Parents: 65755c2
Author: Curtis Howard <curtis@curtis-MBP.local>
Authored: Thu Jan 25 09:28:25 2018 -0500
Committer: Curtis Howard <curtis@curtis-MBP.local>
Committed: Thu Jan 25 09:28:25 2018 -0500

----------------------------------------------------------------------
 .../org/apache/spot/SuspiciousConnects.scala    |  46 ++++---
 .../spot/SuspiciousConnectsArgumentParser.scala |  47 ++++---
 .../utilities/data/InputOutputDataHandler.scala |  44 +++++-
 .../SuspiciousConnectsArgumentParserTest.scala  | 138 ++++++++++++++++++-
 4 files changed, 226 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/579be080/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala 
b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
index 32b7ca4..f47c3e9 100644
--- a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
@@ -63,32 +63,30 @@ object SuspiciousConnects {
           .enableHiveSupport()
           .getOrCreate()
 
-        /*
-        val inputDataFrame = 
InputOutputDataHandler.getInputDataFrame(sparkSession, config.inputPath, logger)
-          .getOrElse(sparkSession.emptyDataFrame)
-        if(inputDataFrame.rdd.isEmpty()) {
-          logger.error("Couldn't read data from location " + config.inputPath 
+", please verify it's a valid location and that " +
-            s"contains parquet files with a given schema and try again.")
-          System.exit(0)
-        }
-        */
-
-        val hive_query = "SELECT * FROM " + config.database + "." + 
config.dataTable + " where (y=" + config.year + " and m=" + config.month + " 
and d=" + config.day + ")"
+        val inputQuery = "SELECT * FROM " + config.database + "." + 
config.dataTable + " where (y=" +
+                         config.year + " and m=" + config.month + " and d=" + 
config.day + ")"
+      
+        val inputDataFrame = if (config.database.trim.nonEmpty) {
+                               
InputOutputDataHandler.getInputDataFrame(sparkSession, inputQuery, logger, true)
+                                 .getOrElse(sparkSession.emptyDataFrame)
+                             } else {
+                               
InputOutputDataHandler.getInputDataFrame(sparkSession, config.inputPath, 
logger, false)
+                                 .getOrElse(sparkSession.emptyDataFrame)
+                             }
 
-        val inputDataFrame = 
InputOutputDataHandler.getInputDataFrame(sparkSession, hive_query, logger)
-          .getOrElse(sparkSession.emptyDataFrame)
         if(inputDataFrame.rdd.isEmpty()) {
-          logger.error("No records returned for Hive query " + hive_query +", 
please verify that data exists or issues with Hive connection.")
+          val dataFramePath = if (config.database.trim.nonEmpty) " query=(" + 
inputQuery + ")" else " path=(hdfs: " + config.inputPath + ")" 
+          logger.error("No records returned for" + dataFramePath +", please 
verify that data and/or connectivity is available")
           System.exit(0)
         }
 
         val results: Option[SuspiciousConnectsAnalysisResults] = analysis 
match {
-          case "flow" => Some(FlowSuspiciousConnectsAnalysis.run(config, 
sparkSession, logger,
-            inputDataFrame))
-          case "dns" => Some(DNSSuspiciousConnectsAnalysis.run(config, 
sparkSession, logger,
-            inputDataFrame))
-          case "proxy" => Some(ProxySuspiciousConnectsAnalysis.run(config, 
sparkSession, logger,
-            inputDataFrame))
+          case "flow" => FlowSuspiciousConnectsAnalysis.run(config, 
sparkSession, logger,
+            inputDataFrame)
+          case "dns" => DNSSuspiciousConnectsAnalysis.run(config, 
sparkSession, logger,
+            inputDataFrame)
+          case "proxy" => ProxySuspiciousConnectsAnalysis.run(config, 
sparkSession, logger,
+            inputDataFrame)
           case _ => None
         }
 
@@ -115,7 +113,11 @@ object SuspiciousConnects {
             InvalidDataHandler.showAndSaveInvalidRecords(invalidRecords, 
config.hdfsScoredConnect, logger)
           }
 
-          case None => logger.error("Unsupported (or misspelled) analysis: " + 
analysis)
+          case None => logger.error(s"Something went wrong while trying to run 
Suspicious Connects Analysis")
+            logger.error(s"Is the value of the analysis parameter (provided: 
$analysis) any of the valid analysis " +
+              s"types flow/dns/proxy?")
+            logger.error("If analysis type is correct please check for other 
errors like schema not matching or " +
+              "bad spark parameters")
         }
 
         sparkSession.stop()
@@ -134,4 +136,4 @@ object SuspiciousConnects {
   case class SuspiciousConnectsAnalysisResults(val suspiciousConnects: 
DataFrame, val invalidRecords: DataFrame)
 
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/579be080/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
----------------------------------------------------------------------
diff --git 
a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala 
b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
index 7661cbe..49cbeab 100644
--- 
a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
+++ 
b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
@@ -33,29 +33,29 @@ object SuspiciousConnectsArgumentParser {
       action((x, c) => c.copy(analysis = x)).
       text("choice of suspicious connections analysis to perform")
 
-    opt[String]("input").required().valueName("<hdfs path>").
+    opt[String]("input").optional().valueName("<hdfs path>").
       action((x, c) => c.copy(inputPath = x)).
       text("HDFS path to input")
 
-    opt[String]("database").required().valueName("<database name>").
+    opt[String]("database").optional().valueName("<database name>").
       action((x, c) => c.copy(database = x)).
-      text("Database name")
+      text("Database name").children(
+        opt[String]("datatable").required().valueName("<source table name>").
+          action((x, c) => c.copy(dataTable = x)).
+          text("hive table name"),
 
-    opt[String]("datatable").required().valueName("<source table name>").
-      action((x, c) => c.copy(dataTable = x)).
-      text("hive table name")
+        opt[Int]("year").required().valueName("<input year>").
+          action((x, c) => c.copy(year = x)).
+          text("input year"),
 
-    opt[String]("year").required().valueName("<input year>").
-      action((x, c) => c.copy(year = x)).
-      text("input year")
+        opt[Int]("month").required().valueName("<input month>").
+          action((x, c) => c.copy(month = x)).
+          text("input month"),
 
-    opt[String]("month").required().valueName("<input month>").
-      action((x, c) => c.copy(month = x)).
-      text("input month")
-
-    opt[String]("day").required().valueName("<input day>").
-      action((x, c) => c.copy(day = x)).
-      text("input day")
+        opt[Int]("day").required().valueName("<input day>").
+          action((x, c) => c.copy(day = x)).
+          text("input day")
+      )
 
     opt[String]("feedback").valueName("<local file>").
       action((x, c) => c.copy(feedbackFile = x)).
@@ -116,15 +116,24 @@ object SuspiciousConnectsArgumentParser {
     opt[String]("ldaoptimizer").optional().valueName("lda optimizer").
       action((x, c) => c.copy(ldaOptimizer = x)).
       text("LDA Optimizer: em for EM Optimizer or online Online Optimizer")
+
+    checkConfig { params =>
+      if ((!params.inputPath.isEmpty  && !params.database.isEmpty) || 
+          (params.inputPath.isEmpty   && params.database.isEmpty)) {
+        failure(s"(input) and (database) options are mutually exclusive - 
please use one or the other.")
+      } else {
+        success
+      }
+    }
   }
 
   case class SuspiciousConnectsConfig(analysis: String = "",
                                       inputPath: String = "",
                                       database: String = "",
                                       dataTable: String = "",
-                                      year: String = "",
-                                      month: String = "",
-                                      day: String = "",
+                                      year: Int = 0,
+                                      month: Int = 0,
+                                      day: Int = 0,
                                       feedbackFile: String = "",
                                       duplicationFactor: Int = 1,
                                       topicCount: Int = 20,

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/579be080/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
----------------------------------------------------------------------
diff --git 
a/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
 
b/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
index d302d16..77fd79f 100644
--- 
a/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
+++ 
b/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spot.utilities.data
 
-import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileUtil 
=> fileUtil}
+import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, 
RemoteIterator, FileUtil => fileUtil}
 import org.apache.log4j.Logger
 import org.apache.spark.sql.{DataFrame, SparkSession}
-
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
 
 /**
   * Handles input and output data for every data set or pipep line 
implementation.
@@ -31,15 +32,18 @@ object InputOutputDataHandler {
   /**
     *
     * @param sparkSession Spark Session.
-    * @param inputPath    HDFS input folder for every execution; flow, dns or 
proxy.
+    * @param inputPath    SQL expression to select relevant data for flow, dns 
or proxy OR hdfs input data path.
     * @param logger       Application logger.
+    * @param isHive       whether or not inputPath is a Hive query (true) OR 
an HDFS input data path (false).
     * @return raw data frame.
     */
-  def getInputDataFrame(sparkSession: SparkSession, inputPath: String, logger: 
Logger): Option[DataFrame] = {
+  def getInputDataFrame(sparkSession: SparkSession, inputPath: String, logger: 
Logger, isHive: Boolean): Option[DataFrame] = {
     try {
       logger.info("Loading data from: " + inputPath)
-      //Some(sparkSession.read.parquet(inputPath))
-      Some(SparkSession.sql(inputPath))
+      if (isHive) 
+        Some(sparkSession.sql(inputPath))
+      else 
+        Some(sparkSession.read.parquet(inputPath))
     } catch {
       case _: Throwable => None
     }
@@ -48,6 +52,34 @@ object InputOutputDataHandler {
   /**
     *
     * @param sparkSession      Spark Session.
+    * @param feedbackFile Feedback file location.
+    * @return new RDD[String] with feedback or empty if file does not exists.
+    */
+  def getFeedbackRDD(sparkSession: SparkSession, feedbackFile: String): 
RDD[String] = {
+
+    val hadoopConfiguration = sparkSession.sparkContext.hadoopConfiguration
+    val fs = FileSystem.get(hadoopConfiguration)
+
+    // We need to pass a default value "file" if fileName is "" to avoid error
+    // java.lang.IllegalArgumentException: Can not create a Path from an empty 
string
+    // when trying to create a new Path object with empty string.
+    val fileExists = fs.exists(new Path(if (feedbackFile == "") "file" else 
feedbackFile))
+
+    if (fileExists) {
+
+      // feedback file is a tab-separated file with a single header line. We 
need to remove the header
+      val lines = sparkSession.sparkContext.textFile(feedbackFile)
+      val header = lines.first()
+      lines.filter(line => line != header)
+
+    } else {
+      sparkSession.sparkContext.emptyRDD[String]
+    }
+  }
+
+  /**
+    *
+    * @param sparkSession      Spark Session
     * @param hdfsScoredConnect HDFS output folder. The location where results 
were saved; flow, dns or proxy.
     * @param analysis          Data type to analyze.
     * @param logger            Application Logger.

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/579be080/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
----------------------------------------------------------------------
diff --git 
a/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
 
b/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
index f61b5ed..47aeea9 100644
--- 
a/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
+++ 
b/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.spot
 
 import 
org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
@@ -5,7 +22,50 @@ import org.scalatest.{FlatSpec, Matchers}
 
 class SuspiciousConnectsArgumentParserTest extends FlatSpec with Matchers {
 
-  "Argument parser" should "parse parameters correctly" in {
+  "Argument parser" should "parse (database/table input) parameters correctly" 
in {
+
+    val args = Array("--analysis", "dns",
+      "--database", "testdb",
+      "--datatable", "testtable",
+      "--year", "2017",
+      "--month", "11",
+      "--day", "1",
+      "--dupfactor", "1000",
+      "--feedback", "/home/user/ml/dns/test/dns_scores.csv",
+      "--ldatopiccount", "10",
+      "--scored", "/user/user/dns/test/scored_results/scores",
+      "--threshold", "1.1",
+      "--maxresults", "20",
+      "--ldamaxiterations", "20",
+      "--ldaalpha", "0.0009",
+      "--ldabeta", "0.00001",
+      "--ldaoptimizer", "online",
+      "--precision", "64",
+      "--userdomain", "mycompany")
+
+    val parser = SuspiciousConnectsArgumentParser.parser
+    val config = parser.parse(args, SuspiciousConnectsConfig()) match {
+      case Some(config) => config
+      case None => SuspiciousConnectsConfig()
+    }
+
+    config.analysis shouldBe "dns"
+    config.inputPath shouldBe ""
+    config.database shouldBe "testdb"
+    config.dataTable shouldBe "testtable"
+    config.year shouldBe 2017
+    config.month shouldBe 11
+    config.day shouldBe 1 
+    config.duplicationFactor shouldBe 1000
+    config.topicCount shouldBe 10
+    config.ldaAlpha shouldBe 0.0009
+    config.ldaBeta shouldBe 0.00001
+    config.ldaOptimizer shouldBe "online"
+
+  }
+
+
+  "Argument parser" should "parse (HDFS Parquet input) parameters correctly" 
in {
 
     val args = Array("--analysis", "dns",
       "--input", "user/spot-data",
@@ -29,6 +89,8 @@ class SuspiciousConnectsArgumentParserTest extends FlatSpec 
with Matchers {
     }
 
     config.analysis shouldBe "dns"
+    config.inputPath shouldBe "user/spot-data"
+    config.database shouldBe ""
     config.duplicationFactor shouldBe 1000
     config.topicCount shouldBe 10
     config.ldaAlpha shouldBe 0.0009
@@ -39,7 +101,11 @@ class SuspiciousConnectsArgumentParserTest extends FlatSpec 
with Matchers {
 
   it should "parse parameters and take defaults when optional parameters are 
not provided" in {
     val args = Array("--analysis", "dns",
-      "--input", "user/spot-data",
+      "--database", "testdb",
+      "--datatable", "testtable",
+      "--year", "2017",
+      "--month", "11",
+      "--day", "1",
       "--feedback", "/home/user/ml/dns/test/dns_scores.csv",
       "--ldatopiccount", "10",
       "--scored", "/user/user/dns/test/scored_results/scores",
@@ -54,6 +120,11 @@ class SuspiciousConnectsArgumentParserTest extends FlatSpec 
with Matchers {
     }
 
     config.analysis shouldBe "dns"
+    config.database shouldBe "testdb"
+    config.dataTable shouldBe "testtable"
+    config.year shouldBe 2017
+    config.month shouldBe 11
+    config.day shouldBe 1 
     config.duplicationFactor shouldBe 1
     config.topicCount shouldBe 10
     config.ldaAlpha shouldBe 1.02
@@ -61,4 +132,67 @@ class SuspiciousConnectsArgumentParserTest extends FlatSpec 
with Matchers {
     config.ldaOptimizer shouldBe "em"
     config.outputDelimiter shouldBe "\t"
   }
+
+
+  "Argument parser" should "return failure if both input HDFS path and input 
database/table are provided" in {
+
+    val args = Array("--analysis", "dns",
+      "--input", "user/spot-data",
+      "--database", "testdb",
+      "--datatable", "testtable",
+      "--year", "2017",
+      "--month", "11",
+      "--day", "1", 
+      "--dupfactor", "1000",
+      "--feedback", "/home/user/ml/dns/test/dns_scores.csv",
+      "--ldatopiccount", "10",
+      "--scored", "/user/user/dns/test/scored_results/scores",
+      "--threshold", "1.1",
+      "--maxresults", "20",
+      "--ldamaxiterations", "20",
+      "--ldaalpha", "0.0009",
+      "--ldabeta", "0.00001",
+      "--ldaoptimizer", "online",
+      "--precision", "64",
+      "--userdomain", "mycompany")
+
+    val parser = SuspiciousConnectsArgumentParser.parser
+    val config = parser.parse(args, SuspiciousConnectsConfig()) match {
+      case Some(config) => config
+      case None => SuspiciousConnectsConfig()
+    }
+
+    config.analysis shouldBe ""
+  }
+
+
+  "Argument parser" should "return failure if any input database/table 
parameters are missing" in {
+
+    val args = Array("--analysis", "dns",
+      "--input", "user/spot-data",
+      "--database", "testdb",
+      "--datatable", "testtable",
+      "--year", "2017",
+      "--day", "1", 
+      "--dupfactor", "1000",
+      "--feedback", "/home/user/ml/dns/test/dns_scores.csv",
+      "--ldatopiccount", "10",
+      "--scored", "/user/user/dns/test/scored_results/scores",
+      "--threshold", "1.1",
+      "--maxresults", "20",
+      "--ldamaxiterations", "20",
+      "--ldaalpha", "0.0009",
+      "--ldabeta", "0.00001",
+      "--ldaoptimizer", "online",
+      "--precision", "64",
+      "--userdomain", "mycompany")
+
+    val parser = SuspiciousConnectsArgumentParser.parser
+    val config = parser.parse(args, SuspiciousConnectsConfig()) match {
+      case Some(config) => config
+      case None => SuspiciousConnectsConfig()
+    }
+
+    config.analysis shouldBe ""
+  }
 }

Reply via email to