Repository: incubator-spot
Updated Branches:
  refs/heads/SPOT-181_ODM e48c5f4dc -> 97291e90d


Changes to ML and OA to read data from Hive tables insteat of Parquet path


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/65755c25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/65755c25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/65755c25

Branch: refs/heads/SPOT-181_ODM
Commit: 65755c252d53adc6cb7effb8b0dae5f88e9df583
Parents: 5f25155
Author: anilreddydonthireddy <anil.donthire...@sstech.us>
Authored: Tue Jul 18 16:38:49 2017 +0530
Committer: anilreddydonthireddy <anil.donthire...@sstech.us>
Committed: Tue Jul 18 16:38:49 2017 +0530

----------------------------------------------------------------------
 spot-ml/ml_ops.sh                               | 13 ++++++++++
 .../org/apache/spot/SuspiciousConnects.scala    | 12 ++++++++++
 .../spot/SuspiciousConnectsArgumentParser.scala | 25 ++++++++++++++++++++
 .../utilities/data/InputOutputDataHandler.scala |  3 ++-
 spot-oa/oa/dns/dns_oa.py                        |  3 ++-
 spot-oa/oa/flow/flow_oa.py                      |  3 ++-
 spot-oa/oa/proxy/proxy_oa.py                    |  3 ++-
 spot-setup/spot.conf                            |  4 ++++
 8 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/65755c25/spot-ml/ml_ops.sh
----------------------------------------------------------------------
diff --git a/spot-ml/ml_ops.sh b/spot-ml/ml_ops.sh
index 4536676..adf09d6 100755
--- a/spot-ml/ml_ops.sh
+++ b/spot-ml/ml_ops.sh
@@ -62,6 +62,14 @@ else
     RAWDATA_PATH=${PROXY_PATH}
 fi
 
+if [ "$DSOURCE" == "flow" ]; then
+    DATA_TABLE=${FLOW_TABLE}
+elif [ "$DSOURCE" == "dns" ]; then
+    DATA_TABLE=${DNS_TABLE}
+else
+    DATA_TABLE=${PROXY_TABLE}
+fi
+
 # pass the user domain designation if not empty
 
 if [ ! -z $USER_DOMAIN ] ; then
@@ -98,6 +106,11 @@ time spark-submit --class 
"org.apache.spot.SuspiciousConnects" \
   --conf spark.yarn.executor.memoryOverhead=${SPK_EXEC_MEM_OVERHEAD} 
target/scala-2.11/spot-ml-assembly-1.1.jar \
   --analysis ${DSOURCE} \
   --input ${RAWDATA_PATH}  \
+  --database ${DBNAME} \
+  --datatable ${DATA_TABLE} \
+  --year ${YR} \
+  --month ${MH} \
+  --day ${DY} \
   --dupfactor ${DUPFACTOR} \
   --feedback ${FEEDBACK_PATH} \
   --ldatopiccount ${TOPIC_COUNT} \

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/65755c25/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala 
b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
index f3f122e..32b7ca4 100644
--- a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
@@ -60,8 +60,10 @@ object SuspiciousConnects {
         val sparkSession = SparkSession.builder
           .appName("Spot ML:  " + analysis + " suspicious connects analysis")
           .master("yarn")
+          .enableHiveSupport()
           .getOrCreate()
 
+        /*
         val inputDataFrame = 
InputOutputDataHandler.getInputDataFrame(sparkSession, config.inputPath, logger)
           .getOrElse(sparkSession.emptyDataFrame)
         if(inputDataFrame.rdd.isEmpty()) {
@@ -69,6 +71,16 @@ object SuspiciousConnects {
             s"contains parquet files with a given schema and try again.")
           System.exit(0)
         }
+        */
+
+        val hive_query = "SELECT * FROM " + config.database + "." + 
config.dataTable + " where (y=" + config.year + " and m=" + config.month + " 
and d=" + config.day + ")"
+
+        val inputDataFrame = 
InputOutputDataHandler.getInputDataFrame(sparkSession, hive_query, logger)
+          .getOrElse(sparkSession.emptyDataFrame)
+        if(inputDataFrame.rdd.isEmpty()) {
+          logger.error("No records returned for Hive query " + hive_query +", 
please verify that data exists or issues with Hive connection.")
+          System.exit(0)
+        }
 
         val results: Option[SuspiciousConnectsAnalysisResults] = analysis 
match {
           case "flow" => Some(FlowSuspiciousConnectsAnalysis.run(config, 
sparkSession, logger,

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/65755c25/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
----------------------------------------------------------------------
diff --git 
a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala 
b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
index 5eea0e0..7661cbe 100644
--- 
a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
+++ 
b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
@@ -37,6 +37,26 @@ object SuspiciousConnectsArgumentParser {
       action((x, c) => c.copy(inputPath = x)).
       text("HDFS path to input")
 
+    opt[String]("database").required().valueName("<database name>").
+      action((x, c) => c.copy(database = x)).
+      text("Database name")
+
+    opt[String]("datatable").required().valueName("<source table name>").
+      action((x, c) => c.copy(dataTable = x)).
+      text("hive table name")
+
+    opt[String]("year").required().valueName("<input year>").
+      action((x, c) => c.copy(year = x)).
+      text("input year")
+
+    opt[String]("month").required().valueName("<input month>").
+      action((x, c) => c.copy(month = x)).
+      text("input month")
+
+    opt[String]("day").required().valueName("<input day>").
+      action((x, c) => c.copy(day = x)).
+      text("input day")
+
     opt[String]("feedback").valueName("<local file>").
       action((x, c) => c.copy(feedbackFile = x)).
       text("the local path of the file that contains the feedback scores")
@@ -100,6 +120,11 @@ object SuspiciousConnectsArgumentParser {
 
   case class SuspiciousConnectsConfig(analysis: String = "",
                                       inputPath: String = "",
+                                      database: String = "",
+                                      dataTable: String = "",
+                                      year: String = "",
+                                      month: String = "",
+                                      day: String = "",
                                       feedbackFile: String = "",
                                       duplicationFactor: Int = 1,
                                       topicCount: Int = 20,

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/65755c25/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
----------------------------------------------------------------------
diff --git 
a/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
 
b/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
index 481e389..d302d16 100644
--- 
a/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
+++ 
b/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
@@ -38,7 +38,8 @@ object InputOutputDataHandler {
   def getInputDataFrame(sparkSession: SparkSession, inputPath: String, logger: 
Logger): Option[DataFrame] = {
     try {
       logger.info("Loading data from: " + inputPath)
-      Some(sparkSession.read.parquet(inputPath))
+      //Some(sparkSession.read.parquet(inputPath))
+      Some(SparkSession.sql(inputPath))
     } catch {
       case _: Throwable => None
     }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/65755c25/spot-oa/oa/dns/dns_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/dns/dns_oa.py b/spot-oa/oa/dns/dns_oa.py
index 2033c89..a923cc2 100644
--- a/spot-oa/oa/dns/dns_oa.py
+++ b/spot-oa/oa/dns/dns_oa.py
@@ -47,7 +47,7 @@ class OA(object):
         # initialize required parameters.
         self._scrtip_path = os.path.dirname(os.path.abspath(__file__))
         self._date = date
-        self._table_name = "dns"
+        #self._table_name = "dns"
         self._dns_results = []
         self._limit = limit
         self._data_path = None
@@ -67,6 +67,7 @@ class OA(object):
 
         # initialize data engine
         self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", 
"").replace('"', '')
+        self._table_name = self._spot_conf.get('conf', 'DNS_TABLE')
         self._engine = Data(self._db,self._table_name ,self._logger) 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/65755c25/spot-oa/oa/flow/flow_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/flow/flow_oa.py b/spot-oa/oa/flow/flow_oa.py
index 027b54f..cc2318b 100644
--- a/spot-oa/oa/flow/flow_oa.py
+++ b/spot-oa/oa/flow/flow_oa.py
@@ -49,7 +49,7 @@ class OA(object):
         # initialize required parameters.
         self._scrtip_path = os.path.dirname(os.path.abspath(__file__))
         self._date = date
-        self._table_name = "flow"
+        #self._table_name = "flow"
         self._flow_results = []
         self._limit = limit
         self._data_path = None
@@ -67,6 +67,7 @@ class OA(object):
  
         # initialize data engine
         self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", 
"").replace('"', '')
+        self._table_name = self._spot_conf.get('conf', 'FLOW_TABLE')
         self._engine = Data(self._db, self._table_name,self._logger)
                       
     def start(self):       

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/65755c25/spot-oa/oa/proxy/proxy_oa.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/proxy/proxy_oa.py b/spot-oa/oa/proxy/proxy_oa.py
index 1324f1a..d16769c 100644
--- a/spot-oa/oa/proxy/proxy_oa.py
+++ b/spot-oa/oa/proxy/proxy_oa.py
@@ -48,7 +48,7 @@ class OA(object):
         # initialize required parameters.
         self._scrtip_path = os.path.dirname(os.path.abspath(__file__))
         self._date = date
-        self._table_name = "proxy"
+        #self._table_name = "proxy"
         self._proxy_results = []
         self._limit = limit
         self._data_path = None
@@ -68,6 +68,7 @@ class OA(object):
 
         # initialize data engine
         self._db = self._spot_conf.get('conf', 'DBNAME').replace("'", 
"").replace('"', '')
+        self._table_name = self._spot_conf.get('conf', 'PROXY_TABLE')
         self._engine = Data(self._db, self._table_name,self._logger)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/65755c25/spot-setup/spot.conf
----------------------------------------------------------------------
diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf
index 0209f4e..abe8030 100755
--- a/spot-setup/spot.conf
+++ b/spot-setup/spot.conf
@@ -11,6 +11,10 @@ PROXY_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/
 FLOW_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/
 HPATH=${HUSER}/${DSOURCE}/scored_results/${FDATE}
 
+FLOW_TABLE=flow_view
+DNS_TABLE=dns_view
+PROXY_TABLE=proxy_view
+
 #impala config
 IMPALA_DEM='node04'
 

Reply via email to